package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.groupbasedpolicy.endpoint.AbstractEndpointRegistry;
import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
import org.opendaylight.groupbasedpolicy.resolver.EgKey;
import org.opendaylight.groupbasedpolicy.util.SetUtils;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ConditionName;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3Address;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoint.fields.L3AddressBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayContextBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayContextInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
/**
* Keep track of endpoints on the system. Maintain an index of endpoints
{
private static final Logger LOG =
LoggerFactory.getLogger(EndpointManager.class);
-
+ private final static InstanceIdentifier<Nodes> nodesIid = InstanceIdentifier
+ .builder(Nodes.class).build();
+ private final static InstanceIdentifier<Node> nodeIid = InstanceIdentifier
+ .builder(Nodes.class).child(Node.class).build();
+ private ListenerRegistration<DataChangeListener> nodesReg;
+
private static final InstanceIdentifier<Endpoint> endpointsIid =
InstanceIdentifier.builder(Endpoints.class)
.child(Endpoint.class).build();
endpointsIid,
this,
DataChangeScope.ONE);
+ nodesReg = dataProvider.registerDataChangeListener(
+ LogicalDatastoreType.OPERATIONAL, nodeIid,
+ new NodesListener(), DataChangeScope.SUBTREE);
+
} else
listenerReg = null;
@Override
protected EndpointBuilder buildEndpoint(RegisterEndpointInput input) {
- OfOverlayContextInput ictx =
- input.getAugmentation(OfOverlayContextInput.class);
- return super.buildEndpoint(input)
- .addAugmentation(OfOverlayContext.class,
- new OfOverlayContextBuilder(ictx).build());
+ // In order to support both the port-name and the data-path information, allow
+ // an EP to register without the augmentations, and resolve later.
+ OfOverlayContextBuilder ictx = checkAugmentation(input);
+ if(ictx == null) {
+ return super.buildEndpoint(input);
+ } else {
+ return super.buildEndpoint(input).addAugmentation(OfOverlayContext.class, ictx.build());
+ }
}
@Override
updateEndpoint(oldEp, (Endpoint)entry.getValue());
}
}
+
+ private class NodesListener implements DataChangeListener {
+ @Override
+ public void onDataChanged(
+ AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+ for (DataObject dao : change.getCreatedData().values()) {
+ if (!(dao instanceof Node))
+ continue;
+ Node node = (Node) dao;
+ if (node.getNodeConnector() != null) {
+ executor.execute(new UpdateEndpoint(node));
+ return;
+ }
+ }
+ for (DataObject dao : change.getUpdatedData().values()) {
+ if (!(dao instanceof Node))
+ continue;
+ Node node = (Node) dao;
+ if (node.getNodeConnector() != null) {
+ executor.execute(new UpdateEndpoint(node));
+ return;
+ }
+ }
+ }
+ }
+
+ private class UpdateEndpoint implements Runnable {
+ private final Node node;
+ private final InstanceIdentifier<Endpoints> endpointsIid;
+
+ public UpdateEndpoint(Node node) {
+ this.node = node;
+ this.endpointsIid=InstanceIdentifier.builder(Endpoints.class).build();
+ }
+ @Override
+ public void run() {
+ Optional<Endpoints> epResult;
+ EpKey epKey=null;
+ for (NodeConnector nc : node.getNodeConnector()) {
+ FlowCapableNodeConnector fcnc = nc
+ .getAugmentation(FlowCapableNodeConnector.class);
+ try {
+ epResult = dataProvider.newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL, endpointsIid).get();
+ if(epResult.isPresent()) {
+ Endpoints endpoints = epResult.get();
+ if(endpoints.getEndpoint() != null) {
+ WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
+ Boolean isEmpty = true;
+ for (Endpoint ep : endpoints.getEndpoint()){
+ // 2. Search for portname
+ OfOverlayContext currentAugmentation = ep.getAugmentation(OfOverlayContext.class);
+ if(ep.getPortName().getValue().equals(fcnc.getName())) {
+ NodeId nodeId;
+ NodeConnectorId nodeConnectorId;
+ try {
+ nodeId=currentAugmentation.getNodeId();
+ nodeConnectorId=currentAugmentation.getNodeConnectorId();
+ } catch (Exception e) {
+ nodeId = null;
+ nodeConnectorId = null;
+ }
+ Boolean process=false;
+ if(nodeId==null && nodeConnectorId ==null) {
+ LOG.debug("ep NodeID and NC ID Both null");
+ process=true;
+ }
+ if(nodeId!=null && nodeConnectorId !=null) {
+ if (!(nodeConnectorId.getValue().equals(nc.getId().getValue()))) {
+ LOG.debug("ep NodeID and NC ID Both NOT null but epNCID !=nodeNCID");
+ process=true;
+ }
+ }
+ if(process) {
+ // 3. Update endpoint
+ EndpointBuilder epBuilder = new EndpointBuilder(ep);
+ OfOverlayContextBuilder ofOverlayAugmentation = new OfOverlayContextBuilder();
+ ofOverlayAugmentation.setNodeId(node.getId());
+ ofOverlayAugmentation.setNodeConnectorId(nc.getId());
+ epBuilder.addAugmentation(OfOverlayContext.class,ofOverlayAugmentation.build());
+ //TODO Hack to remove:
+ List<L3Address> l3Addresses= new ArrayList<>();
+ for(L3Address l3Address: ep.getL3Address()) {
+ L3AddressBuilder l3AB = new L3AddressBuilder();
+ l3AB.setIpAddress(l3Address.getIpAddress()).setL3Context(l3Address.getL3Context());
+ l3Addresses.add(l3AB.build());
+ }
+ epBuilder.setL3Address(l3Addresses);
+ InstanceIdentifier<Endpoint> iidEp = InstanceIdentifier.builder(Endpoints.class).child(Endpoint.class,ep.getKey()).build();
+ tx.put(LogicalDatastoreType.OPERATIONAL, iidEp, epBuilder.build());
+ epKey=new EpKey(ep.getKey().getL2Context(),ep.getKey().getMacAddress());
+ LOG.debug("Values:");
+ LOG.debug("node: Node ID:"+node.getId().getValue());
+ LOG.debug("node: NodeConnectorID: "+nc.getId().getValue());
+ if(nodeId!=null && nodeConnectorId != null) {
+ LOG.debug("ep: nodeID:"+nodeId.getValue());
+ LOG.debug("ep: nodeConnectorID:"+nodeConnectorId.getValue());
+ }
+ isEmpty=false;
+ }
+ }
+ }
+ if(!isEmpty) {
+ CheckedFuture<Void, TransactionCommitFailedException> f = tx.submit();
+ notifyEndpointUpdated(epKey);
+ Futures.addCallback(f, new FutureCallback<Void>() {
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Could not over-write endpoint with augmentation", t);
+ }
+
+ @Override
+ public void onSuccess(Void result) {
+ LOG.debug("Success over-writing endpoint augmentation");
+ }
+ });
+ } else {
+ LOG.debug("UpdateEndpoint: Empty list");
+ }
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ LOG.warn("Caught exception in UpdateEndpoint");
+ }
+ }
+ }
+ }
// **************
// Implementation
// **************
if (notifyNewEg)
notifyGroupEndpointUpdated(newKey, epKey);
}
+
+ private OfOverlayContextBuilder checkAugmentation(RegisterEndpointInput input) {
+ OfOverlayContextBuilder ictxBuilder=new OfOverlayContextBuilder();
+ OfOverlayContextInput ictx =null;
+
+ ictx = input.getAugmentation(OfOverlayContextInput.class);
+ if(ictx!=null) {
+ ictxBuilder.setNodeConnectorId(ictx.getNodeConnectorId());
+ ictxBuilder.setNodeId(ictx.getNodeId());
+ } else {
+ NodeInfo augmentation = fetchAugmentation(input.getPortName().getValue());
+ if(augmentation != null) {
+ ictxBuilder.setNodeId(augmentation.getNode().getId());
+ ictxBuilder.setNodeConnectorId(augmentation.getNodeConnector().getId());
+ }
+ }
+ return ictxBuilder;
+ }
+
+ // A wrapper class around node, noeConnector info so we can pass a final
+ // object inside OnSuccess anonymous inner class
+ private static class NodeInfo {
+ NodeConnector nodeConnector;
+ Node node;
+
+ private NodeInfo() {
+
+ }
+
+ private NodeInfo(NodeConnector nc, Node node) {
+ this.nodeConnector = nc;
+ this.node = node;
+ }
+
+ private Node getNode() {
+ return this.node;
+ }
+
+ private NodeConnector getNodeConnector() {
+ return this.nodeConnector;
+ }
+
+ public void setNodeConnector(NodeConnector nodeConnector) {
+ this.nodeConnector = nodeConnector;
+ }
+
+ public void setNode(Node node) {
+ this.node = node;
+ }
+ }
+
+ private NodeInfo fetchAugmentation(String portName) {
+ NodeInfo nodeInfo=null;
+
+ if (dataProvider != null) {
+
+ Optional<Nodes> result;
+ try {
+ result = dataProvider
+ .newReadOnlyTransaction().read(
+ LogicalDatastoreType.OPERATIONAL, nodesIid).get();
+ if (result.isPresent()) {
+ Nodes nodes = result.get();
+ for (Node node : nodes.getNode()) {
+ if (node.getNodeConnector() != null) {
+ boolean found = false;
+ for (NodeConnector nc : node.getNodeConnector()) {
+ FlowCapableNodeConnector fcnc = nc
+ .getAugmentation(FlowCapableNodeConnector.class);
+ if (fcnc.getName().equals(portName)) {
+ nodeInfo=new NodeInfo();
+ nodeInfo.setNode(node);
+ nodeInfo.setNodeConnector(nc);
+ found=true;
+ break;
+ }
+ }
+ if(found) break;
+ }
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ return nodeInfo;
+ }
}