2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.List;
16 import java.util.Map.Entry;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.CopyOnWriteArrayList;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
31 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
32 import org.opendaylight.groupbasedpolicy.endpoint.AbstractEndpointRegistry;
33 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
34 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
35 import org.opendaylight.groupbasedpolicy.util.SetUtils;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ConditionName;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3Address;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3AddressBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayContext;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayContextBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayContextInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
54 import org.opendaylight.yangtools.concepts.ListenerRegistration;
55 import org.opendaylight.yangtools.yang.binding.DataObject;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 import com.google.common.base.Function;
61 import com.google.common.base.Optional;
62 import com.google.common.base.Predicate;
63 import com.google.common.collect.Collections2;
64 import com.google.common.collect.Sets;
65 import com.google.common.util.concurrent.CheckedFuture;
66 import com.google.common.util.concurrent.FutureCallback;
67 import com.google.common.util.concurrent.Futures;
70 * Keep track of endpoints on the system. Maintain an index of endpoints
71 * and their locations for renderering. The endpoint manager will maintain
72 * appropriate indexes only for switches that are attached to the current
75 * In order to render the policy, we need to be able to efficiently enumerate
76 * all endpoints on a particular switch and also all the switches containing
77 * each particular endpoint group
80 public class EndpointManager
81 extends AbstractEndpointRegistry
82 implements AutoCloseable, DataChangeListener
84 private static final Logger LOG =
85 LoggerFactory.getLogger(EndpointManager.class);
86 private final static InstanceIdentifier<Nodes> nodesIid = InstanceIdentifier
87 .builder(Nodes.class).build();
88 private final static InstanceIdentifier<Node> nodeIid = InstanceIdentifier
89 .builder(Nodes.class).child(Node.class).build();
90 private ListenerRegistration<DataChangeListener> nodesReg;
92 private static final InstanceIdentifier<Endpoint> endpointsIid =
93 InstanceIdentifier.builder(Endpoints.class)
94 .child(Endpoint.class).build();
95 final ListenerRegistration<DataChangeListener> listenerReg;
97 private final ConcurrentHashMap<EpKey, Endpoint> endpoints =
98 new ConcurrentHashMap<>();
99 private final ConcurrentHashMap<NodeId,
100 ConcurrentMap<EgKey, Set<EpKey>>> endpointsByNode =
101 new ConcurrentHashMap<>();
102 private final ConcurrentHashMap<EgKey, Set<EpKey>> endpointsByGroup =
103 new ConcurrentHashMap<>();
105 private List<EndpointListener> listeners = new CopyOnWriteArrayList<>();
107 public EndpointManager(DataBroker dataProvider,
108 RpcProviderRegistry rpcRegistry,
109 ScheduledExecutorService executor,
110 SwitchManager switchManager) {
111 super(dataProvider, rpcRegistry, executor);
113 if (dataProvider != null) {
114 listenerReg = dataProvider
115 .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
118 DataChangeScope.ONE);
119 nodesReg = dataProvider.registerDataChangeListener(
120 LogicalDatastoreType.OPERATIONAL, nodeIid,
121 new NodesListener(), DataChangeScope.SUBTREE);
126 LOG.debug("Initialized OFOverlay endpoint manager");
134 * Add a {@link EndpointListener} to get notifications of switch events
135 * @param listener the {@link EndpointListener} to add
137 public void registerListener(EndpointListener listener) {
138 listeners.add(listener);
142 * Get a collection of endpoints attached to a particular switch
143 * @param nodeId the nodeId of the switch to get endpoints for
144 * @return a collection of {@link Endpoint} objects.
146 public Set<EgKey> getGroupsForNode(NodeId nodeId) {
147 Map<EgKey, Set<EpKey>> nodeEps = endpointsByNode.get(nodeId);
148 if (nodeEps == null) return Collections.emptySet();
149 return Collections.unmodifiableSet(nodeEps.keySet());
153 * Get the set of nodes
154 * @param nodeId the nodeId of the switch to get endpoints for
155 * @return a collection of {@link Endpoint} objects.
157 public Set<NodeId> getNodesForGroup(final EgKey egKey) {
158 return Collections.unmodifiableSet(Sets.filter(endpointsByNode.keySet(),
159 new Predicate<NodeId>() {
161 public boolean apply(NodeId input) {
162 Map<EgKey, Set<EpKey>> nodeEps =
163 endpointsByNode.get(input);
164 return (nodeEps != null &&
165 nodeEps.containsKey(egKey));
172 * Get the endpoints in a particular group on a particular node
173 * @param nodeId the node ID to look up
174 * @param eg the group to look up
175 * @return the endpoints
177 public Collection<Endpoint> getEPsForNode(NodeId nodeId, EgKey eg) {
178 Map<EgKey, Set<EpKey>> nodeEps = endpointsByNode.get(nodeId);
179 if (nodeEps == null) return Collections.emptyList();
180 Collection<EpKey> ebn = nodeEps.get(eg);
181 if (ebn == null) return Collections.emptyList();
182 return Collections.unmodifiableCollection(Collections2
188 * Get the endpoint object for the given key
189 * @param epKey the key
190 * @return the {@link Endpoint} corresponding to the key
192 public Endpoint getEndpoint(EpKey epKey) {
193 return endpoints.get(epKey);
197 * Set the learning mode to the specified value
198 * @param learningMode the learning mode to set
200 public void setLearningMode(LearningMode learningMode) {
205 * Get a collection of endpoints in a particular endpoint group
206 * @param nodeId the nodeId of the switch to get endpoints for
207 * @return a collection of {@link Endpoint} objects.
209 public Collection<Endpoint> getEndpointsForGroup(EgKey eg) {
210 Collection<EpKey> ebg = endpointsByGroup.get(eg);
211 if (ebg == null) return Collections.emptyList();
212 return Collections2.transform(ebg, indexTransform);
216 * Get the effective list of conditions that apply to a particular
217 * endpoint. This could include additional conditions over the condition
218 * labels directly represented in the endpoint object
219 * @param endpoint the {@link Endpoint} to resolve
220 * @return the list of {@link ConditionName}
222 public List<ConditionName> getCondsForEndpoint(Endpoint endpoint) {
223 // XXX TODO consider group conditions as well. Also need to notify
224 // endpoint updated if the endpoint group conditions change
225 if (endpoint.getCondition() != null)
226 return endpoint.getCondition();
227 else return Collections.emptyList();
230 // ************************
231 // AbstractEndpointRegistry
232 // ************************
235 protected EndpointBuilder buildEndpoint(RegisterEndpointInput input) {
236 // In order to support both the port-name and the data-path information, allow
237 // an EP to register without the augmentations, and resolve later.
238 OfOverlayContextBuilder ictx = checkAugmentation(input);
240 return super.buildEndpoint(input);
242 return super.buildEndpoint(input).addAugmentation(OfOverlayContext.class, ictx.build());
247 protected EndpointL3Builder buildEndpointL3(RegisterEndpointInput input) {
248 return super.buildEndpointL3(input);
256 public void close() throws Exception {
257 if (listenerReg != null) listenerReg.close();
261 // ******************
262 // DataChangeListener
263 // ******************
266 public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
267 for (DataObject dao : change.getCreatedData().values()) {
268 if (dao instanceof Endpoint)
269 updateEndpoint(null, (Endpoint)dao);
271 for (InstanceIdentifier<?> iid : change.getRemovedPaths()) {
272 DataObject old = change.getOriginalData().get(iid);
273 if (old != null && old instanceof Endpoint)
274 updateEndpoint((Endpoint)old, null);
276 Map<InstanceIdentifier<?>,DataObject> d = change.getUpdatedData();
277 for (Entry<InstanceIdentifier<?>, DataObject> entry : d.entrySet()) {
278 if (!(entry.getValue() instanceof Endpoint)) continue;
279 DataObject old = change.getOriginalData().get(entry.getKey());
280 Endpoint oldEp = null;
281 if (old != null && old instanceof Endpoint)
282 oldEp = (Endpoint)old;
283 updateEndpoint(oldEp, (Endpoint)entry.getValue());
287 private class NodesListener implements DataChangeListener {
289 public void onDataChanged(
290 AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
291 for (DataObject dao : change.getCreatedData().values()) {
292 if (!(dao instanceof Node))
294 Node node = (Node) dao;
295 if (node.getNodeConnector() != null) {
296 executor.execute(new UpdateEndpoint(node));
300 for (DataObject dao : change.getUpdatedData().values()) {
301 if (!(dao instanceof Node))
303 Node node = (Node) dao;
304 if (node.getNodeConnector() != null) {
305 executor.execute(new UpdateEndpoint(node));
312 private class UpdateEndpoint implements Runnable {
313 private final Node node;
314 private final InstanceIdentifier<Endpoints> endpointsIid;
316 public UpdateEndpoint(Node node) {
318 this.endpointsIid=InstanceIdentifier.builder(Endpoints.class).build();
322 Optional<Endpoints> epResult;
324 for (NodeConnector nc : node.getNodeConnector()) {
325 FlowCapableNodeConnector fcnc = nc
326 .getAugmentation(FlowCapableNodeConnector.class);
328 epResult = dataProvider.newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL, endpointsIid).get();
329 if(epResult.isPresent()) {
330 Endpoints endpoints = epResult.get();
331 if(endpoints.getEndpoint() != null) {
332 WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
333 Boolean isEmpty = true;
334 for (Endpoint ep : endpoints.getEndpoint()){
335 // 2. Search for portname
336 OfOverlayContext currentAugmentation = ep.getAugmentation(OfOverlayContext.class);
337 if(ep.getPortName().getValue().equals(fcnc.getName())) {
339 NodeConnectorId nodeConnectorId;
341 nodeId=currentAugmentation.getNodeId();
342 nodeConnectorId=currentAugmentation.getNodeConnectorId();
343 } catch (Exception e) {
345 nodeConnectorId = null;
347 Boolean process=false;
348 if(nodeId==null && nodeConnectorId ==null) {
349 LOG.debug("ep NodeID and NC ID Both null");
352 if(nodeId!=null && nodeConnectorId !=null) {
353 if (!(nodeConnectorId.getValue().equals(nc.getId().getValue()))) {
354 LOG.debug("ep NodeID and NC ID Both NOT null but epNCID !=nodeNCID");
359 // 3. Update endpoint
360 EndpointBuilder epBuilder = new EndpointBuilder(ep);
361 OfOverlayContextBuilder ofOverlayAugmentation = new OfOverlayContextBuilder();
362 ofOverlayAugmentation.setNodeId(node.getId());
363 ofOverlayAugmentation.setNodeConnectorId(nc.getId());
364 epBuilder.addAugmentation(OfOverlayContext.class,ofOverlayAugmentation.build());
365 //TODO Hack to remove:
366 List<L3Address> l3Addresses= new ArrayList<>();
367 for(L3Address l3Address: ep.getL3Address()) {
368 L3AddressBuilder l3AB = new L3AddressBuilder();
369 l3AB.setIpAddress(l3Address.getIpAddress()).setL3Context(l3Address.getL3Context());
370 l3Addresses.add(l3AB.build());
372 epBuilder.setL3Address(l3Addresses);
373 InstanceIdentifier<Endpoint> iidEp = InstanceIdentifier.builder(Endpoints.class).child(Endpoint.class,ep.getKey()).build();
374 tx.put(LogicalDatastoreType.OPERATIONAL, iidEp, epBuilder.build());
375 epKey=new EpKey(ep.getKey().getL2Context(),ep.getKey().getMacAddress());
376 LOG.debug("Values:");
377 LOG.debug("node: Node ID:"+node.getId().getValue());
378 LOG.debug("node: NodeConnectorID: "+nc.getId().getValue());
379 if(nodeId!=null && nodeConnectorId != null) {
380 LOG.debug("ep: nodeID:"+nodeId.getValue());
381 LOG.debug("ep: nodeConnectorID:"+nodeConnectorId.getValue());
388 CheckedFuture<Void, TransactionCommitFailedException> f = tx.submit();
389 notifyEndpointUpdated(epKey);
390 Futures.addCallback(f, new FutureCallback<Void>() {
392 public void onFailure(Throwable t) {
393 LOG.error("Could not over-write endpoint with augmentation", t);
397 public void onSuccess(Void result) {
398 LOG.debug("Success over-writing endpoint augmentation");
402 LOG.debug("UpdateEndpoint: Empty list");
406 } catch (InterruptedException | ExecutionException e) {
408 LOG.warn("Caught exception in UpdateEndpoint");
417 private void notifyEndpointUpdated(EpKey epKey) {
418 for (EndpointListener l : listeners) {
419 l.endpointUpdated(epKey);
423 private void notifyNodeEndpointUpdated(NodeId nodeId, EpKey epKey) {
424 for (EndpointListener l : listeners) {
425 l.nodeEndpointUpdated(nodeId, epKey);
429 private void notifyGroupEndpointUpdated(EgKey egKey, EpKey epKey) {
430 for (EndpointListener l : listeners) {
431 l.groupEndpointUpdated(egKey, epKey);
435 private Function<EpKey, Endpoint> indexTransform =
436 new Function<EpKey, Endpoint>() {
438 public Endpoint apply(EpKey input) {
439 return endpoints.get(input);
443 private boolean validEp(Endpoint endpoint) {
444 return (endpoint != null && endpoint.getTenant() != null &&
445 endpoint.getEndpointGroup() != null &&
446 endpoint.getL2Context() != null &&
447 endpoint.getMacAddress() != null);
450 private NodeId getLocation(Endpoint endpoint) {
451 if (!validEp(endpoint))
453 OfOverlayContext context =
454 endpoint.getAugmentation(OfOverlayContext.class);
456 return context.getNodeId();
461 private EpKey getEpKey(Endpoint endpoint) {
462 if (!validEp(endpoint))
464 return new EpKey(endpoint.getL2Context(), endpoint.getMacAddress());
467 private EgKey getEgKey(Endpoint endpoint) {
468 if (!validEp(endpoint))
470 return new EgKey(endpoint.getTenant(), endpoint.getEndpointGroup());
473 private Set<EpKey> getEpNGSet(NodeId location, EgKey eg) {
474 ConcurrentMap<EgKey, Set<EpKey>> map = endpointsByNode.get(location);
476 map = new ConcurrentHashMap<>();
477 ConcurrentMap<EgKey, Set<EpKey>> old =
478 endpointsByNode.putIfAbsent(location, map);
482 return SetUtils.getNestedSet(eg, map);
485 private static final ConcurrentMap<EgKey, Set<EpKey>> EMPTY_MAP =
486 new ConcurrentHashMap<>();
488 private Set<EpKey> getEpGSet(EgKey eg) {
489 return SetUtils.getNestedSet(eg, endpointsByGroup);
493 * Update the endpoint indexes. Set newEp to null to remove.
495 protected void updateEndpoint(Endpoint oldEp, Endpoint newEp) {
496 // XXX TODO only keep track of endpoints that are attached
497 // to switches that are actually connected to us
498 NodeId oldLoc = getLocation(oldEp);
499 NodeId newLoc = getLocation(newEp);
501 EgKey oldKey = getEgKey(oldEp);
502 EgKey newKey = getEgKey(newEp);
504 EpKey epKey = getEpKey(oldEp);
505 if (epKey == null) epKey = getEpKey(newEp);
506 if (epKey == null) return;
508 boolean notifyOldLoc = false;
509 boolean notifyNewLoc = false;
510 boolean notifyOldEg = false;
511 boolean notifyNewEg = false;
514 endpoints.put(epKey, newEp);
516 if (oldLoc != null && oldKey != null &&
517 (newLoc == null || !oldLoc.equals(newLoc) ||
518 newKey == null || !oldKey.equals(newKey))) {
519 ConcurrentMap<EgKey, Set<EpKey>> map =
520 endpointsByNode.get(oldLoc);
521 Set<EpKey> eps = map.get(oldKey);
523 map.remove(oldKey, Collections.emptySet());
524 endpointsByNode.remove(oldLoc, EMPTY_MAP);
527 if (oldKey != null &&
528 (newKey == null || !oldKey.equals(newKey))) {
529 Set<EpKey> gns = getEpGSet(oldKey);
534 if (newLoc != null && newKey != null) {
535 Set<EpKey> eps = getEpNGSet(newLoc, newKey);
537 LOG.debug("Endpoint {} added to node {}", epKey, newLoc);
540 if (newKey != null) {
541 Set<EpKey> gns = getEpGSet(newKey);
543 LOG.debug("Endpoint {} added to group {}", epKey, newKey);
548 endpoints.remove(epKey);
550 notifyEndpointUpdated(epKey);
553 notifyNodeEndpointUpdated(oldLoc,epKey);
555 notifyNodeEndpointUpdated(newLoc,epKey);
557 notifyGroupEndpointUpdated(oldKey, epKey);
559 notifyGroupEndpointUpdated(newKey, epKey);
562 private OfOverlayContextBuilder checkAugmentation(RegisterEndpointInput input) {
563 OfOverlayContextBuilder ictxBuilder=new OfOverlayContextBuilder();
564 OfOverlayContextInput ictx =null;
566 ictx = input.getAugmentation(OfOverlayContextInput.class);
568 ictxBuilder.setNodeConnectorId(ictx.getNodeConnectorId());
569 ictxBuilder.setNodeId(ictx.getNodeId());
570 } else if(input.getPortName()!=null) {
571 NodeInfo augmentation = fetchAugmentation(input.getPortName().getValue());
572 if(augmentation != null) {
573 ictxBuilder.setNodeId(augmentation.getNode().getId());
574 ictxBuilder.setNodeConnectorId(augmentation.getNodeConnector().getId());
582 // A wrapper class around node, noeConnector info so we can pass a final
583 // object inside OnSuccess anonymous inner class
584 private static class NodeInfo {
585 NodeConnector nodeConnector;
592 private NodeInfo(NodeConnector nc, Node node) {
593 this.nodeConnector = nc;
597 private Node getNode() {
601 private NodeConnector getNodeConnector() {
602 return this.nodeConnector;
605 public void setNodeConnector(NodeConnector nodeConnector) {
606 this.nodeConnector = nodeConnector;
609 public void setNode(Node node) {
614 private NodeInfo fetchAugmentation(String portName) {
615 NodeInfo nodeInfo=null;
617 if (dataProvider != null) {
619 Optional<Nodes> result;
621 result = dataProvider
622 .newReadOnlyTransaction().read(
623 LogicalDatastoreType.OPERATIONAL, nodesIid).get();
624 if (result.isPresent()) {
625 Nodes nodes = result.get();
626 for (Node node : nodes.getNode()) {
627 if (node.getNodeConnector() != null) {
628 boolean found = false;
629 for (NodeConnector nc : node.getNodeConnector()) {
630 FlowCapableNodeConnector fcnc = nc
631 .getAugmentation(FlowCapableNodeConnector.class);
632 if (fcnc.getName().equals(portName)) {
633 nodeInfo=new NodeInfo();
634 nodeInfo.setNode(node);
635 nodeInfo.setNodeConnector(nc);
644 } catch (InterruptedException | ExecutionException e) {
645 // TODO Auto-generated catch block