2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Collections;
14 import java.util.List;
16 import java.util.Map.Entry;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.CopyOnWriteArrayList;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
31 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
32 import org.opendaylight.groupbasedpolicy.endpoint.EndpointRpcRegistry;
33 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
34 import org.opendaylight.groupbasedpolicy.endpoint.EpRendererAugmentation;
35 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
36 import org.opendaylight.groupbasedpolicy.util.SetUtils;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ConditionName;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterL3PrefixEndpointInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3Address;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3AddressBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3PrefixBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayContext;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayContextBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayContextInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
57 import org.opendaylight.yangtools.concepts.ListenerRegistration;
58 import org.opendaylight.yangtools.yang.binding.DataObject;
59 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 import com.google.common.base.Function;
64 import com.google.common.base.Optional;
65 import com.google.common.base.Predicate;
66 import com.google.common.collect.Collections2;
67 import com.google.common.collect.Sets;
68 import com.google.common.util.concurrent.CheckedFuture;
69 import com.google.common.util.concurrent.FutureCallback;
70 import com.google.common.util.concurrent.Futures;
73 * Keep track of endpoints on the system. Maintain an index of endpoints
74 * and their locations for renderering. The endpoint manager will maintain
75 * appropriate indexes only for switches that are attached to the current
78 * In order to render the policy, we need to be able to efficiently enumerate
79 * all endpoints on a particular switch and also all the switches containing
80 * each particular endpoint group
83 public class EndpointManager implements AutoCloseable, DataChangeListener
85 private static final Logger LOG =
86 LoggerFactory.getLogger(EndpointManager.class);
87 private final static InstanceIdentifier<Nodes> nodesIid = InstanceIdentifier
88 .builder(Nodes.class).build();
89 private final static InstanceIdentifier<Node> nodeIid = InstanceIdentifier
90 .builder(Nodes.class).child(Node.class).build();
91 private ListenerRegistration<DataChangeListener> nodesReg;
93 private static final InstanceIdentifier<Endpoint> endpointsIid =
94 InstanceIdentifier.builder(Endpoints.class)
95 .child(Endpoint.class).build();
96 final ListenerRegistration<DataChangeListener> listenerReg;
98 private final ConcurrentHashMap<EpKey, Endpoint> endpoints =
99 new ConcurrentHashMap<>();
100 private final ConcurrentHashMap<NodeId,
101 ConcurrentMap<EgKey, Set<EpKey>>> endpointsByNode =
102 new ConcurrentHashMap<>();
103 private final ConcurrentHashMap<EgKey, Set<EpKey>> endpointsByGroup =
104 new ConcurrentHashMap<>();
106 private List<EndpointListener> listeners = new CopyOnWriteArrayList<>();
108 final private OfEndpointAug endpointRpcAug = new OfEndpointAug();
110 final private ScheduledExecutorService executor;
112 final private DataBroker dataProvider;
114 public EndpointManager(DataBroker dataProvider,
115 RpcProviderRegistry rpcRegistry,
116 ScheduledExecutorService executor,
117 SwitchManager switchManager) {
118 this.executor = executor;
119 this.dataProvider = dataProvider;
120 EndpointRpcRegistry.register(dataProvider, rpcRegistry, executor, endpointRpcAug);
121 if (dataProvider != null) {
122 listenerReg = dataProvider
123 .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
126 DataChangeScope.ONE);
127 nodesReg = dataProvider.registerDataChangeListener(
128 LogicalDatastoreType.OPERATIONAL, nodeIid,
129 new NodesListener(), DataChangeScope.SUBTREE);
134 LOG.debug("Initialized OFOverlay endpoint manager");
142 * Add a {@link EndpointListener} to get notifications of switch events
143 * @param listener the {@link EndpointListener} to add
145 public void registerListener(EndpointListener listener) {
146 listeners.add(listener);
150 * Get a collection of endpoints attached to a particular switch
151 * @param nodeId the nodeId of the switch to get endpoints for
152 * @return a collection of {@link Endpoint} objects.
154 public Set<EgKey> getGroupsForNode(NodeId nodeId) {
155 Map<EgKey, Set<EpKey>> nodeEps = endpointsByNode.get(nodeId);
156 if (nodeEps == null) return Collections.emptySet();
157 return Collections.unmodifiableSet(nodeEps.keySet());
161 * Get the set of nodes
162 * @param nodeId the nodeId of the switch to get endpoints for
163 * @return a collection of {@link Endpoint} objects.
165 public Set<NodeId> getNodesForGroup(final EgKey egKey) {
166 return Collections.unmodifiableSet(Sets.filter(endpointsByNode.keySet(),
167 new Predicate<NodeId>() {
169 public boolean apply(NodeId input) {
170 Map<EgKey, Set<EpKey>> nodeEps =
171 endpointsByNode.get(input);
172 return (nodeEps != null &&
173 nodeEps.containsKey(egKey));
180 * Get the endpoints in a particular group on a particular node
181 * @param nodeId the node ID to look up
182 * @param eg the group to look up
183 * @return the endpoints
185 public Collection<Endpoint> getEPsForNode(NodeId nodeId, EgKey eg) {
186 Map<EgKey, Set<EpKey>> nodeEps = endpointsByNode.get(nodeId);
187 if (nodeEps == null) return Collections.emptyList();
188 Collection<EpKey> ebn = nodeEps.get(eg);
189 if (ebn == null) return Collections.emptyList();
190 return Collections.unmodifiableCollection(Collections2
196 * Get the endpoint object for the given key
197 * @param epKey the key
198 * @return the {@link Endpoint} corresponding to the key
200 public Endpoint getEndpoint(EpKey epKey) {
201 return endpoints.get(epKey);
205 * Set the learning mode to the specified value
206 * @param learningMode the learning mode to set
208 public void setLearningMode(LearningMode learningMode) {
213 * Get a collection of endpoints in a particular endpoint group
214 * @param nodeId the nodeId of the switch to get endpoints for
215 * @return a collection of {@link Endpoint} objects.
217 public Collection<Endpoint> getEndpointsForGroup(EgKey eg) {
218 Collection<EpKey> ebg = endpointsByGroup.get(eg);
219 if (ebg == null) return Collections.emptyList();
220 return Collections2.transform(ebg, indexTransform);
224 * Get the effective list of conditions that apply to a particular
225 * endpoint. This could include additional conditions over the condition
226 * labels directly represented in the endpoint object
227 * @param endpoint the {@link Endpoint} to resolve
228 * @return the list of {@link ConditionName}
230 public List<ConditionName> getCondsForEndpoint(Endpoint endpoint) {
231 // XXX TODO consider group conditions as well. Also need to notify
232 // endpoint updated if the endpoint group conditions change
233 if (endpoint.getCondition() != null)
234 return endpoint.getCondition();
235 else return Collections.emptyList();
238 // ************************
239 // Endpoint Augmentation
240 // ************************
241 private class OfEndpointAug implements EpRendererAugmentation {
244 public void buildEndpointAugmentation(EndpointBuilder eb,
245 RegisterEndpointInput input) {
246 // In order to support both the port-name and the data-path information, allow
247 // an EP to register without the augmentations, and resolve later.
248 OfOverlayContextBuilder ictx = checkAugmentation(input);
250 eb.addAugmentation(OfOverlayContext.class, ictx.build());
255 public void buildEndpointL3Augmentation(EndpointL3Builder eb,
256 RegisterEndpointInput input) {
260 public void buildL3PrefixEndpointAugmentation(EndpointL3PrefixBuilder eb, RegisterL3PrefixEndpointInput input) {
261 // TODO Auto-generated method stub
271 public void close() throws Exception {
272 if (listenerReg != null) listenerReg.close();
273 EndpointRpcRegistry.unregister(endpointRpcAug);
276 // ******************
277 // DataChangeListener
278 // ******************
281 public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
282 for (DataObject dao : change.getCreatedData().values()) {
283 if (dao instanceof Endpoint)
284 updateEndpoint(null, (Endpoint)dao);
286 for (InstanceIdentifier<?> iid : change.getRemovedPaths()) {
287 DataObject old = change.getOriginalData().get(iid);
288 if (old != null && old instanceof Endpoint)
289 updateEndpoint((Endpoint)old, null);
291 Map<InstanceIdentifier<?>,DataObject> d = change.getUpdatedData();
292 for (Entry<InstanceIdentifier<?>, DataObject> entry : d.entrySet()) {
293 if (!(entry.getValue() instanceof Endpoint)) continue;
294 DataObject old = change.getOriginalData().get(entry.getKey());
295 Endpoint oldEp = null;
296 if (old != null && old instanceof Endpoint)
297 oldEp = (Endpoint)old;
298 updateEndpoint(oldEp, (Endpoint)entry.getValue());
302 private class NodesListener implements DataChangeListener {
304 public void onDataChanged(
305 AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
306 for (DataObject dao : change.getCreatedData().values()) {
307 if (!(dao instanceof Node))
309 Node node = (Node) dao;
310 if (node.getNodeConnector() != null) {
311 executor.execute(new UpdateEndpoint(node));
315 for (DataObject dao : change.getUpdatedData().values()) {
316 if (!(dao instanceof Node))
318 Node node = (Node) dao;
319 if (node.getNodeConnector() != null) {
320 executor.execute(new UpdateEndpoint(node));
327 private class UpdateEndpoint implements Runnable {
328 private final Node node;
329 private final InstanceIdentifier<Endpoints> endpointsIid;
331 public UpdateEndpoint(Node node) {
333 this.endpointsIid=InstanceIdentifier.builder(Endpoints.class).build();
337 Optional<Endpoints> epResult;
339 for (NodeConnector nc : node.getNodeConnector()) {
340 FlowCapableNodeConnector fcnc = nc
341 .getAugmentation(FlowCapableNodeConnector.class);
343 epResult = dataProvider.newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL, endpointsIid).get();
344 if(epResult.isPresent()) {
345 Endpoints endpoints = epResult.get();
346 if(endpoints.getEndpoint() != null) {
347 WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
348 Boolean isEmpty = true;
349 for (Endpoint ep : endpoints.getEndpoint()){
350 // 2. Search for portname
351 OfOverlayContext currentAugmentation = ep.getAugmentation(OfOverlayContext.class);
352 if(ep.getPortName().getValue().equals(fcnc.getName())) {
354 NodeConnectorId nodeConnectorId;
356 nodeId=currentAugmentation.getNodeId();
357 nodeConnectorId=currentAugmentation.getNodeConnectorId();
358 } catch (Exception e) {
360 nodeConnectorId = null;
362 Boolean process=false;
363 if(nodeId==null && nodeConnectorId ==null) {
364 LOG.debug("ep NodeID and NC ID Both null");
367 if(nodeId!=null && nodeConnectorId !=null) {
368 if (!(nodeConnectorId.getValue().equals(nc.getId().getValue()))) {
369 LOG.debug("ep NodeID and NC ID Both NOT null but epNCID !=nodeNCID");
374 // 3. Update endpoint
375 EndpointBuilder epBuilder = new EndpointBuilder(ep);
376 OfOverlayContextBuilder ofOverlayAugmentation = new OfOverlayContextBuilder();
377 ofOverlayAugmentation.setNodeId(node.getId());
378 ofOverlayAugmentation.setNodeConnectorId(nc.getId());
379 epBuilder.addAugmentation(OfOverlayContext.class,ofOverlayAugmentation.build());
380 //TODO Hack to remove:
381 List<L3Address> l3Addresses= new ArrayList<>();
382 for(L3Address l3Address: ep.getL3Address()) {
383 L3AddressBuilder l3AB = new L3AddressBuilder();
384 l3AB.setIpAddress(l3Address.getIpAddress()).setL3Context(l3Address.getL3Context());
385 l3Addresses.add(l3AB.build());
387 epBuilder.setL3Address(l3Addresses);
388 InstanceIdentifier<Endpoint> iidEp = InstanceIdentifier.builder(Endpoints.class).child(Endpoint.class,ep.getKey()).build();
389 tx.put(LogicalDatastoreType.OPERATIONAL, iidEp, epBuilder.build());
390 epKey=new EpKey(ep.getKey().getL2Context(),ep.getKey().getMacAddress());
391 LOG.debug("Values:");
392 LOG.debug("node: Node ID:"+node.getId().getValue());
393 LOG.debug("node: NodeConnectorID: "+nc.getId().getValue());
394 if(nodeId!=null && nodeConnectorId != null) {
395 LOG.debug("ep: nodeID:"+nodeId.getValue());
396 LOG.debug("ep: nodeConnectorID:"+nodeConnectorId.getValue());
403 CheckedFuture<Void, TransactionCommitFailedException> f = tx.submit();
404 notifyEndpointUpdated(epKey);
405 Futures.addCallback(f, new FutureCallback<Void>() {
407 public void onFailure(Throwable t) {
408 LOG.error("Could not over-write endpoint with augmentation", t);
412 public void onSuccess(Void result) {
413 LOG.debug("Success over-writing endpoint augmentation");
417 LOG.debug("UpdateEndpoint: Empty list");
421 } catch (InterruptedException | ExecutionException e) {
422 LOG.error("Caught exception in UpdateEndpoint",e);
431 private void notifyEndpointUpdated(EpKey epKey) {
432 for (EndpointListener l : listeners) {
433 l.endpointUpdated(epKey);
437 private void notifyNodeEndpointUpdated(NodeId nodeId, EpKey epKey) {
438 for (EndpointListener l : listeners) {
439 l.nodeEndpointUpdated(nodeId, epKey);
443 private void notifyGroupEndpointUpdated(EgKey egKey, EpKey epKey) {
444 for (EndpointListener l : listeners) {
445 l.groupEndpointUpdated(egKey, epKey);
449 private Function<EpKey, Endpoint> indexTransform =
450 new Function<EpKey, Endpoint>() {
452 public Endpoint apply(EpKey input) {
453 return endpoints.get(input);
457 private boolean validEp(Endpoint endpoint) {
458 return (endpoint != null && endpoint.getTenant() != null &&
459 endpoint.getEndpointGroup() != null &&
460 endpoint.getL2Context() != null &&
461 endpoint.getMacAddress() != null);
464 private NodeId getLocation(Endpoint endpoint) {
465 if (!validEp(endpoint))
467 OfOverlayContext context =
468 endpoint.getAugmentation(OfOverlayContext.class);
470 return context.getNodeId();
475 private EpKey getEpKey(Endpoint endpoint) {
476 if (!validEp(endpoint))
478 return new EpKey(endpoint.getL2Context(), endpoint.getMacAddress());
481 private EgKey getEgKey(Endpoint endpoint) {
482 if (!validEp(endpoint))
484 return new EgKey(endpoint.getTenant(), endpoint.getEndpointGroup());
487 private Set<EpKey> getEpNGSet(NodeId location, EgKey eg) {
488 ConcurrentMap<EgKey, Set<EpKey>> map = endpointsByNode.get(location);
490 map = new ConcurrentHashMap<>();
491 ConcurrentMap<EgKey, Set<EpKey>> old =
492 endpointsByNode.putIfAbsent(location, map);
496 return SetUtils.getNestedSet(eg, map);
499 private static final ConcurrentMap<EgKey, Set<EpKey>> EMPTY_MAP =
500 new ConcurrentHashMap<>();
502 private Set<EpKey> getEpGSet(EgKey eg) {
503 return SetUtils.getNestedSet(eg, endpointsByGroup);
507 * Update the endpoint indexes. Set newEp to null to remove.
509 protected void updateEndpoint(Endpoint oldEp, Endpoint newEp) {
510 // XXX TODO only keep track of endpoints that are attached
511 // to switches that are actually connected to us
512 NodeId oldLoc = getLocation(oldEp);
513 NodeId newLoc = getLocation(newEp);
515 EgKey oldKey = getEgKey(oldEp);
516 EgKey newKey = getEgKey(newEp);
518 EpKey epKey = getEpKey(oldEp);
519 if (epKey == null) epKey = getEpKey(newEp);
520 if (epKey == null) return;
522 boolean notifyOldLoc = false;
523 boolean notifyNewLoc = false;
524 boolean notifyOldEg = false;
525 boolean notifyNewEg = false;
528 endpoints.put(epKey, newEp);
530 if (oldLoc != null && oldKey != null &&
531 (newLoc == null || !oldLoc.equals(newLoc) ||
532 newKey == null || !oldKey.equals(newKey))) {
533 ConcurrentMap<EgKey, Set<EpKey>> map =
534 endpointsByNode.get(oldLoc);
535 Set<EpKey> eps = map.get(oldKey);
537 map.remove(oldKey, Collections.emptySet());
538 endpointsByNode.remove(oldLoc, EMPTY_MAP);
541 if (oldKey != null &&
542 (newKey == null || !oldKey.equals(newKey))) {
543 Set<EpKey> gns = getEpGSet(oldKey);
548 if (newLoc != null && newKey != null) {
549 Set<EpKey> eps = getEpNGSet(newLoc, newKey);
551 LOG.debug("Endpoint {} added to node {}", epKey, newLoc);
554 if (newKey != null) {
555 Set<EpKey> gns = getEpGSet(newKey);
557 LOG.debug("Endpoint {} added to group {}", epKey, newKey);
562 endpoints.remove(epKey);
564 notifyEndpointUpdated(epKey);
567 notifyNodeEndpointUpdated(oldLoc,epKey);
569 notifyNodeEndpointUpdated(newLoc,epKey);
571 notifyGroupEndpointUpdated(oldKey, epKey);
573 notifyGroupEndpointUpdated(newKey, epKey);
576 private OfOverlayContextBuilder checkAugmentation(RegisterEndpointInput input) {
577 OfOverlayContextBuilder ictxBuilder=new OfOverlayContextBuilder();
578 OfOverlayContextInput ictx =null;
580 ictx = input.getAugmentation(OfOverlayContextInput.class);
582 ictxBuilder.setNodeConnectorId(ictx.getNodeConnectorId());
583 ictxBuilder.setNodeId(ictx.getNodeId());
584 } else if(input.getPortName()!=null) {
585 NodeInfo augmentation = fetchAugmentation(input.getPortName().getValue());
586 if(augmentation != null) {
587 ictxBuilder.setNodeId(augmentation.getNode().getId());
588 ictxBuilder.setNodeConnectorId(augmentation.getNodeConnector().getId());
596 // A wrapper class around node, noeConnector info so we can pass a final
597 // object inside OnSuccess anonymous inner class
598 private static class NodeInfo {
599 NodeConnector nodeConnector;
606 private NodeInfo(NodeConnector nc, Node node) {
607 this.nodeConnector = nc;
611 private Node getNode() {
615 private NodeConnector getNodeConnector() {
616 return this.nodeConnector;
619 public void setNodeConnector(NodeConnector nodeConnector) {
620 this.nodeConnector = nodeConnector;
623 public void setNode(Node node) {
628 private NodeInfo fetchAugmentation(String portName) {
629 NodeInfo nodeInfo=null;
631 if (dataProvider != null) {
633 Optional<Nodes> result;
635 result = dataProvider
636 .newReadOnlyTransaction().read(
637 LogicalDatastoreType.OPERATIONAL, nodesIid).get();
638 if (result.isPresent()) {
639 Nodes nodes = result.get();
640 for (Node node : nodes.getNode()) {
641 if (node.getNodeConnector() != null) {
642 boolean found = false;
643 for (NodeConnector nc : node.getNodeConnector()) {
644 FlowCapableNodeConnector fcnc = nc
645 .getAugmentation(FlowCapableNodeConnector.class);
646 if (fcnc.getName().equals(portName)) {
647 nodeInfo=new NodeInfo();
648 nodeInfo.setNode(node);
649 nodeInfo.setNodeConnector(nc);
658 } catch (InterruptedException | ExecutionException e) {
659 LOG.error("Could not fetch Node Augmentation",e);