Convert DataChangeListeners to DataTreeChangeListeners
[nemo.git] / nemo-renderers / openflow-renderer / src / main / java / org / opendaylight / nemo / renderer / openflow / physicalnetwork / PhysicalNetworkAdapter.java
1 /*
2  * Copyright (c) 2015 Huawei, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.nemo.renderer.openflow.physicalnetwork;
10
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.Timer;
18 import java.util.TimerTask;
19 import java.util.concurrent.CopyOnWriteArraySet;
20 import java.util.concurrent.CountDownLatch;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
25 import org.opendaylight.nemo.renderer.openflow.FlowUtils;
26 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.generic.physical.network.rev151010.physical.network.physical.links.PhysicalLink;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.generic.physical.network.rev151010.physical.network.physical.links.PhysicalLinkBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.generic.physical.network.rev151010.physical.network.physical.links.PhysicalLinkKey;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.generic.physical.network.rev151010.physical.network.physical.nodes.PhysicalNode;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.generic.physical.network.rev151010.physical.network.physical.nodes.PhysicalNodeBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.generic.physical.network.rev151010.physical.network.physical.nodes.PhysicalNodeKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.generic.physical.network.rev151010.physical.node.instance.PhysicalPort;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.generic.physical.network.rev151010.physical.node.instance.PhysicalPortBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.generic.physical.network.rev151010.physical.node.instance.PhysicalPortKey;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.nemo.engine.common.rev151010.PhysicalLinkId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.nemo.engine.common.rev151010.PhysicalNodeId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.nemo.engine.common.rev151010.PhysicalPortId;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
50 import org.opendaylight.yangtools.concepts.ListenerRegistration;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.opendaylight.yangtools.yang.binding.NotificationListener;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 public class PhysicalNetworkAdapter {
57     private static final Logger log = LoggerFactory.getLogger(PhysicalNetworkAdapter.class);
58     private static final String DEFAULT_TOPOLOGY_ID = "flow:1";
59
60     final private DataBroker dataBroker;
61     private final PhyConfigLoader phyConfigLoader;
62     private final DataBrokerAdapter dataBrokerAdapter;
63     private final PhysicalFlowUtils physicalFlowUtils;
64     private final FlowUtils ofFlowUtils;
65     private final NotificationProviderService notificationProviderService;
66
67     private final CopyOnWriteArraySet<String> nodeIdSet;
68     private final CopyOnWriteArraySet<PhysicalLink> physicalLinkSet;
69     private final Timer phyTimer;
70     private boolean running = false;
71     private final Integer mutex = 0;
72     private ListenerRegistration<NotificationListener> ofPacketInListenerReg;
73     private ListenerRegistration<?> ofNodesListenerReg;
74     private ListenerRegistration<?> ofLinksListenerReg;
75
76     public PhysicalNetworkAdapter(DataBroker dataBroker
77             , NotificationProviderService notificationProviderService
78             , PhyConfigLoader phyConfigLoader
79             , FlowUtils ofFlowUtils) {
80         this.dataBroker = dataBroker;
81         this.notificationProviderService = notificationProviderService;
82         this.ofFlowUtils = ofFlowUtils;
83         this.phyConfigLoader = phyConfigLoader;
84         this.dataBrokerAdapter = new DataBrokerAdapter(dataBroker);
85         physicalFlowUtils = new PhysicalFlowUtils(dataBroker);
86         nodeIdSet = new CopyOnWriteArraySet<>();
87         physicalLinkSet = new CopyOnWriteArraySet<>();
88
89         phyTimer = new Timer();
90
91
92         registerListeners();
93         initOFNodes();
94         initOFLinks();
95     }
96
97     public void close() {
98         if (ofPacketInListenerReg != null) {
99             ofPacketInListenerReg.close();
100         }
101         if (ofLinksListenerReg != null) {
102             ofLinksListenerReg.close();
103         }
104         if (ofNodesListenerReg != null) {
105             ofNodesListenerReg.close();
106         }
107         if (phyConfigLoader != null) {
108             phyConfigLoader.close();
109         }
110         log.debug("Clear....\r\n{}", nodeIdSet);
111         nodeIdSet.clear();
112         physicalLinkSet.clear();
113     }
114
115     public PhyConfigLoader getPhyConfigLoader() {
116         return phyConfigLoader;
117     }
118
119     /**
120      * OFNode instance identifier
121      *
122      * @return
123      */
124     private InstanceIdentifier<Node> getOFNodeInstanceIdentifier() {
125         return InstanceIdentifier.builder(Nodes.class).child(Node.class).build();
126     }
127
128     private InstanceIdentifier<Link> getOFLinkInstanceIdentifier() {
129         return InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, new TopologyKey(new TopologyId(DEFAULT_TOPOLOGY_ID))).child(Link.class).build();
130     }
131
132     private InstanceIdentifier<Topology> getOFTopologyInstanceIdentifier() {
133         return InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, new TopologyKey(new TopologyId(DEFAULT_TOPOLOGY_ID))).build();
134     }
135
136     private InstanceIdentifier<Nodes> getOFNodesInstanceIdentifier() {
137         return InstanceIdentifier.builder(Nodes.class).build();
138     }
139
140     private InstanceIdentifier<NodeConnector> getOFPortInstanceIdentifier(NodeKey nodeKey, NodeConnectorKey connectorKey) {
141         return InstanceIdentifier.builder(Nodes.class).child(Node.class, nodeKey).child(NodeConnector.class, connectorKey).build();
142     }
143
144     private void registerListeners() {
145         InstanceIdentifier<Node> nodeInsId = getOFNodeInstanceIdentifier();
146         InstanceIdentifier<Link> linkInsId = getOFLinkInstanceIdentifier();
147         ofNodesListenerReg = dataBroker.registerDataTreeChangeListener(new DataTreeIdentifier<>(
148                 LogicalDatastoreType.OPERATIONAL, nodeInsId),  new OFNodeListener(this));
149         ofLinksListenerReg = dataBroker.registerDataTreeChangeListener(new DataTreeIdentifier<>(
150                 LogicalDatastoreType.OPERATIONAL, linkInsId), new OFLinkListener(this));
151         ofPacketInListenerReg = notificationProviderService.registerNotificationListener(new OFPacketInListener(ofFlowUtils));
152     }
153
154     private void initOFLinks() {
155         InstanceIdentifier<Topology> topologyInsId = getOFTopologyInstanceIdentifier();
156         ListenableFuture<Optional<Topology>> topologyFuture = dataBroker.newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL, topologyInsId);
157         Futures.addCallback(topologyFuture, new FutureCallback<Optional<Topology>>() {
158             @Override
159             public void onSuccess(Optional<Topology> result) {
160                 if (result.isPresent() && result.get() instanceof Topology) {
161                     Topology topology = result.get();
162                     if (topology != null && topology.getLink() != null) {
163                         for (Link link : topology.getLink()) {
164                             ofLinkAdded(link);
165                         }
166                     }
167                 }
168
169                 return;
170             }
171
172             @Override
173             public void onFailure(Throwable t) {
174                 log.error("Can not read the link info of topology {}: {}", DEFAULT_TOPOLOGY_ID, t);
175                 return;
176             }
177         });
178     }
179
180     private void initOFNodes() {
181         InstanceIdentifier<Nodes> nodesInsId = getOFNodesInstanceIdentifier();
182         ListenableFuture<Optional<Nodes>> nodesFuture = dataBroker.newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL, nodesInsId);
183         Futures.addCallback(nodesFuture, new FutureCallback<Optional<Nodes>>() {
184             @Override
185             public void onSuccess(Optional<Nodes> result) {
186                 if (result.isPresent() && result.get() instanceof Nodes) {
187                     Nodes nodes = result.get();
188                     if (nodes != null && nodes.getNode() != null) {
189                         for (Node node : nodes.getNode()) {
190                             ofNodeAdded(node);
191                         }
192                     }
193                 }
194                 return;
195             }
196
197             @Override
198             public void onFailure(Throwable t) {
199                 log.error("Can not read node information: {}", t);
200                 return;
201             }
202         });
203
204     }
205
206     protected void ofNodeAdded(Node node) {
207         log.debug("OF node added: {}.", node.getKey());
208         String strNodeId = node.getId().getValue();
209         // Add default flow entry. - Don't do this here, because it will
210         // result in that the openflow plugin can't discover the topology.
211         // Flow entries for LLDP are deployed manually through a shell script.
212         // Flow entries for ARP are added in class FlowUtils.
213 //        physicalFlowUtils.configArpPEntry(strNodeId);
214 //        physicalFlowUtils.configLLDPEntry(strNodeId);
215
216         PhysicalNodeId nodeId = new PhysicalNodeId(strNodeId);
217         PhysicalNodeBuilder nodeBuilder = new PhysicalNodeBuilder();
218         nodeBuilder.setNodeId(nodeId);
219         nodeBuilder.setKey(new PhysicalNodeKey(nodeId));
220         List<PhysicalPort> physicalPortList = new ArrayList<>();
221         List<NodeConnector> nodeConnectors = node.getNodeConnector();
222         if (nodeConnectors == null || nodeConnectors.size() == 0) {
223             log.error("Node : {}, without port.", strNodeId);
224         }
225         if (nodeConnectors != null) {
226             for (NodeConnector nodeConnector : nodeConnectors) {
227                 PhysicalPort physicalPort = getPhysicalPort(node.getKey(), nodeConnector);
228                 if (physicalPort != null) {
229                     physicalPortList.add(physicalPort);
230                 }
231             }
232         }
233         nodeBuilder.setPhysicalPort(physicalPortList);
234
235         PhysicalNode confPhyNode = phyConfigLoader.getPhysicalNode(nodeId);
236         if (confPhyNode != null) {
237             nodeBuilder.setNodeType(confPhyNode.getNodeType());
238             nodeBuilder.setAttribute(confPhyNode.getAttribute());
239         } else {
240             log.warn("Find one OF Node {},does not have info in config file.", node.getKey());
241         }
242         boolean result = dataBrokerAdapter.addPhysicalNode(nodeBuilder.build());
243 //        if (result) {
244         nodeIdSet.add(strNodeId);
245         log.debug("Add....{}\r\n{}", strNodeId, nodeIdSet);
246 //        }
247     }
248
249     protected void ofNodeRemoved(Node node) {
250         log.debug("OF node removed: {}.", node.getKey());
251         String strNodeId = node.getId().getValue();
252         PhysicalNodeId nodeId = new PhysicalNodeId(strNodeId);
253         PhysicalNode confPhyNode = phyConfigLoader.getPhysicalNode(nodeId);
254         if (confPhyNode == null) {
255             log.warn("Find one OF Node removed {},does not have info in config file.", node.getKey());
256         }
257         boolean result = dataBrokerAdapter.removePhysicalNode(new PhysicalNodeKey(nodeId));
258 //        if (result) {
259         nodeIdSet.remove(strNodeId);
260         log.debug("Remove....{}\r\n{}", strNodeId, nodeIdSet);
261 //        }
262     }
263
264     private PhysicalPort getPhysicalPort(NodeKey nodeKey, NodeConnector nodeConnector) {
265         String strConnectorId = nodeConnector.getId().getValue();
266         if (strConnectorId.contains("LOCAL")) {
267             return null;
268         }
269         PhysicalPortId physicalPortId = new PhysicalPortId(strConnectorId);
270         log.debug("Get port {} : {}.", nodeKey, nodeConnector.getId().getValue());
271         FlowCapableNodeConnector flowCapableNodeConnector = getOFPort(nodeKey, nodeConnector.getKey());
272         if (flowCapableNodeConnector != null) {
273             PhysicalPortBuilder physicalPortBuilder = new PhysicalPortBuilder();
274             physicalPortBuilder.setPortId(physicalPortId);
275             physicalPortBuilder.setKey(new PhysicalPortKey(physicalPortId));
276             physicalPortBuilder.setBandwidth(PhyConfigLoader.DEFAULT_LINK_BANDWIDTH);
277             MacAddress macAddress = flowCapableNodeConnector.getHardwareAddress();
278             physicalPortBuilder.setMacAddress(macAddress);
279
280             PhysicalPort confPhyPort = phyConfigLoader.getPhysicalPort(physicalPortId);
281             if (confPhyPort != null) {
282                 log.debug("Set port {} : {}.\r\n {} \r\n{}", nodeKey, nodeConnector.getId().getValue(), confPhyPort.getPortType().toString(), confPhyPort.getAttribute());
283 //                long bandwidth = flowCapableNodeConnector.getMaximumSpeed() > 0 ? flowCapableNodeConnector.getMaximumSpeed() : confPhyPort.getBandwidth();
284                 physicalPortBuilder.setPortType(confPhyPort.getPortType());
285                 physicalPortBuilder.setAttribute(confPhyPort.getAttribute());
286             } else {
287                 log.warn("Can not get config info of {}-{} form data broker.", nodeKey.getId(), strConnectorId);
288             }
289             return physicalPortBuilder.build();
290         } else {
291             log.warn("Can not read OF port info of {}-{} form .", nodeKey.getId(), strConnectorId);
292         }
293         return null;
294     }
295
296     private FlowCapableNodeConnector getOFPort(final NodeKey nodeKey, final NodeConnectorKey nodeConnectorKey) {
297         final FlowCapableNodeConnector[] flowCapableNodeConnector = {null};
298
299         InstanceIdentifier<NodeConnector> nodeConnectorInsId = getOFPortInstanceIdentifier(nodeKey, nodeConnectorKey);
300         final CountDownLatch downLatch = new CountDownLatch(1);
301         ListenableFuture<Optional<NodeConnector>> nodeConnectorFuture = dataBroker.newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL, nodeConnectorInsId);
302         Futures.addCallback(nodeConnectorFuture, new FutureCallback<Optional<NodeConnector>>() {
303             @Override
304             public void onSuccess(Optional<NodeConnector> result) {
305                 if (result.isPresent() && result.get() instanceof NodeConnector) {
306                     flowCapableNodeConnector[0] = result.get().getAugmentation(FlowCapableNodeConnector.class);
307                     downLatch.countDown();
308                 }
309
310                 return;
311             }
312
313             @Override
314             public void onFailure(Throwable t) {
315                 log.error("Can not read the information of node connector {}-{} : {}", nodeKey, nodeConnectorKey, t);
316                 downLatch.countDown();
317                 return;
318             }
319         });
320
321         try {
322             downLatch.await();
323         } catch (InterruptedException e) {
324             // TODO Auto-generated catch block
325             log.error("Exception:",e);
326         }
327
328         return flowCapableNodeConnector[0];
329     }
330
331     protected void ofLinkAdded(Link link) {
332         log.debug("OF link added:{}.", link.getKey());
333
334         String srcNode = link.getSource().getSourceNode().getValue();
335         String srcTp = link.getSource().getSourceTp().getValue();
336         String dstNode = link.getDestination().getDestNode().getValue();
337         String dstTp = link.getDestination().getDestTp().getValue();
338
339         String strLinkId = link.getLinkId().getValue();
340         PhysicalLinkBuilder linkBuilder = new PhysicalLinkBuilder();
341         linkBuilder.setLinkId(new PhysicalLinkId(strLinkId));
342         linkBuilder.setSrcNodeId(new PhysicalNodeId(srcNode));
343         linkBuilder.setSrcPortId(new PhysicalPortId(srcTp));
344         linkBuilder.setDestNodeId(new PhysicalNodeId(dstNode));
345         linkBuilder.setDestPortId(new PhysicalPortId(dstTp));
346
347         linkBuilder.setBandwidth(PhyConfigLoader.DEFAULT_LINK_BANDWIDTH);
348         linkBuilder.setDelay(PhyConfigLoader.DEFAULT_LINK_DELAY);
349         linkBuilder.setLossRate(PhyConfigLoader.DEFAULT_LINK_LOSS_RATE);
350
351         PhysicalLinkId physicalLinkId = new PhysicalLinkId(strLinkId);
352         PhysicalLink physicalLink = phyConfigLoader.getPhysicalLink(physicalLinkId);
353         if (physicalLink != null) {
354             linkBuilder.setMetric(physicalLink.getMetric());
355         } else {
356             log.warn("Can not find conf info of {}.", link.getKey());
357         }
358         synchronized (mutex) {
359             physicalLinkSet.add(linkBuilder.build());
360             if (!running) {
361                 phyTimer.schedule(new PhyTransmit(), 10, 500);
362                 running = true;
363             }
364         }
365     }
366
367     protected void ofLinkRemoved(Link link) {
368         log.debug("OF link removed:{}.", link.getKey());
369         String strLinkId = link.getLinkId().getValue();
370         PhysicalLinkId linkId = new PhysicalLinkId(strLinkId);
371         PhysicalLink confPhyLink = phyConfigLoader.getPhysicalLink(linkId);
372         if (confPhyLink == null) {
373             log.warn("Can not find conf info of {} while remove.", link);
374         }
375
376         dataBrokerAdapter.removePhysicalLink(new PhysicalLinkKey(linkId));
377     }
378
379     class PhyTransmit extends TimerTask {
380
381         @Override
382         public void run() {
383             synchronized (mutex) {
384                 for (PhysicalLink physicalLink : physicalLinkSet) {
385                     handleLink(physicalLink);
386                 }
387                 // Cancel timer. - Don't cancel timer, because this will result in
388                 // that some physical links aren't wrote into data store sometimes.
389 //                if (physicalLinkSet.size() == 0) {
390 //                    phyTimer.cancel();
391 //                    running = false;
392 //                }
393             }
394         }
395
396         private void handleLink(PhysicalLink physicalLink) {
397             String srcNodeId = physicalLink.getSrcNodeId().getValue();
398             String dsrNodeId = physicalLink.getDestNodeId().getValue();
399             if (nodeIdSet.contains(srcNodeId) && nodeIdSet.contains(dsrNodeId)) {
400                 physicalLinkSet.remove(physicalLink);
401                 log.debug("Put [{}]-[{}] to data broker.", srcNodeId, dsrNodeId);
402                 dataBrokerAdapter.addPhysicalLink(physicalLink);
403             }
404         }
405     }
406 }