/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.groupbasedpolicy.renderer.opflex; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledExecutorService; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.groupbasedpolicy.endpoint.AbstractEndpointRegistry; import org.opendaylight.groupbasedpolicy.endpoint.EpKey; import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint; import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker; import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage; import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexAgent; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexConnectionService; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.Role; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointDeclareRequest; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointDeclareResponse; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointIdentity; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointResolveResponse; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUndeclareRequest; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUndeclareResponse; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUnresolveRequest; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUnresolveResponse; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUpdateRequest; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointResolveRequest; import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.ManagedObject; import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.MitLib; import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.PolicyUri; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContext; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextL3; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextL3Builder; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Keep track of endpoints on the system. Maintain an index of endpoints * and their locations for queries from agents. The endpoint manager will maintain * appropriate indexes only for agents that are attached to the current * controller node. * * In order to render the policy, we need to be able to efficiently enumerate * all endpoints on a particular agent and also all the agents containing * each particular endpoint group * @author tbachman */ public class EndpointManager extends AbstractEndpointRegistry implements AutoCloseable, DataChangeListener, RpcBroker.RpcCallback, EprContext.EprCtxCallback { protected static final Logger LOG = LoggerFactory.getLogger(EndpointManager.class); private static final InstanceIdentifier endpointsIid = InstanceIdentifier.builder(Endpoints.class) .child(Endpoint.class).build(); private static final InstanceIdentifier endpointsL3Iid = InstanceIdentifier.builder(Endpoints.class) .child(EndpointL3.class).build(); final ListenerRegistration listenerReg; final ListenerRegistration listenerL3Reg; private final OpflexConnectionService connectionService; private final MitLib mitLibrary; final ConcurrentHashMap endpoints; private ConcurrentHashMap> epSubscriptions; private RpcMessageMap messageMap = null; public EndpointManager(DataBroker dataProvider, RpcProviderRegistry rpcRegistry, ScheduledExecutorService executor, OpflexConnectionService connectionService, MitLib opflexLibrary) { super(dataProvider, rpcRegistry, executor); if (dataProvider != null) { listenerReg = dataProvider .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, endpointsIid, this, DataChangeScope.ONE); listenerL3Reg = dataProvider .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, endpointsL3Iid, this, DataChangeScope.ONE); } else { listenerReg = null; listenerL3Reg = null; } this.connectionService = connectionService; this.mitLibrary = opflexLibrary; endpoints = new ConcurrentHashMap(); epSubscriptions = new ConcurrentHashMap>(); /* Subscribe to EPR messages */ messageMap = new RpcMessageMap(); List messages = Role.ENDPOINT_REGISTRY.getMessages(); messageMap.addList(messages); for (RpcMessage msg: messages) { this.connectionService.subscribe(msg, this); } LOG.trace("Initialized OpFlex endpoint manager"); } /** * Shut down the {@link EndpointManager} * */ public void shutdown() { } // *************** // EndpointManager // *************** /** * Get the endpoint object for the given key * @param epKey the key * @return the {@link Endpoint} corresponding to the key */ public Endpoint getEndpoint(EpKey epKey) { return endpoints.get(epKey); } // ************************ // AbstractEndpointRegistry // ************************ @Override protected EndpointBuilder buildEndpoint(RegisterEndpointInput input) { OpflexOverlayContextInput ictx = input.getAugmentation(OpflexOverlayContextInput.class); return super.buildEndpoint(input) .addAugmentation(OpflexOverlayContext.class, new OpflexOverlayContextBuilder(ictx).build()); } @Override protected EndpointL3Builder buildEndpointL3(RegisterEndpointInput input) { OpflexOverlayContextInput ictx = input.getAugmentation(OpflexOverlayContextInput.class); return super.buildEndpointL3(input) .addAugmentation(OpflexOverlayContextL3.class, new OpflexOverlayContextL3Builder(ictx).build()); } // ************* // AutoCloseable // ************* @Override public void close() throws Exception { if (listenerReg != null) listenerReg.close(); super.close(); } // ****************** // DataChangeListener // ****************** @Override public void onDataChanged(AsyncDataChangeEvent, DataObject> change) { for (DataObject dao : change.getCreatedData().values()) { if (dao instanceof Endpoint) updateEndpoint(null, dao); } for (InstanceIdentifier iid : change.getRemovedPaths()) { DataObject old = change.getOriginalData().get(iid); if (old != null && old instanceof Endpoint) updateEndpoint(old, null); } Map,DataObject> d = change.getUpdatedData(); for (Entry, DataObject> entry : d.entrySet()) { if ((!(entry.getValue() instanceof Endpoint)) && (!(entry.getValue() instanceof EndpointL3))) continue; DataObject old = change.getOriginalData().get(entry.getKey()); DataObject oldEp = null; if (entry instanceof Endpoint || entry instanceof EndpointL3) { if (old != null && old instanceof Endpoint) oldEp = old; updateEndpoint(oldEp, entry.getValue()); } } } // ************** // Implementation // ************** private Identity getIdentity(DataObject obj) { Identity id = null; if (obj instanceof Endpoint) { Endpoint ep = (Endpoint)obj; id = new Identity(ep); id.setContext(ep.getL2Context().getValue()); } if (obj instanceof EndpointL3) { EndpointL3 ep = (EndpointL3)obj; id = new Identity(ep); id.setContext(ep.getL3Context().getValue()); } if (id != null && !id.valid()) { return null; } return id; } private synchronized Set getEpSubscriptions(String id) { return epSubscriptions.get(id); } /** * Provide endpoint policy update messages based on changes */ protected void updateEndpoint(DataObject oldEp, DataObject newEp) { Identity oldId = getIdentity(oldEp); Identity newId = getIdentity(newEp); /* * If an endpoint has changed, we need to provide notifications * to agents that have subscribed to that endpoint. Batch up * the notifications to be sent to the agents. */ Queue updateQ = new ConcurrentLinkedQueue(); /* This covers additions or updates */ if (newId != null) { Set agentList = getEpSubscriptions(newId.identityAsString()); if (agentList != null) { for (String agentId : agentList) { OpflexAgent agent = connectionService.getOpflexAgent(agentId); if (agent != null) { updateQ.add(new EndpointUpdate(EndpointUpdate.UpdateType.ADD_CHANGE, agent.getEndpoint(), newEp)); } } } } /* this covers deletions */ if ((newId == null) && (oldId != null)) { Set agentList = getEpSubscriptions(oldId.identityAsString()); if (agentList != null) { for (String agentId : agentList) { OpflexAgent agent = connectionService.getOpflexAgent(agentId); if (agent != null) { updateQ.add(new EndpointUpdate(EndpointUpdate.UpdateType.DELETE, agent.getEndpoint(), oldEp)); } } } } sendEpUpdates(updateQ); } private static class EndpointUpdate implements Runnable { public static enum UpdateType { ADD_CHANGE("add_change"), DELETE("delete"); private String updateType; UpdateType(String updateType) { this.updateType = updateType; } @Override public String toString() { return this.updateType; } } private final UpdateType type; private final JsonRpcEndpoint agent; private final ManagedObject mo; EndpointUpdate(UpdateType type, JsonRpcEndpoint agent, DataObject obj) { this.type = type; this.agent = agent; mo = MessageUtils.getMoFromEp(obj); } @Override public void run() { EndpointUpdateRequest request = new EndpointUpdateRequest(); EndpointUpdateRequest.Params params = new EndpointUpdateRequest.Params(); List paramList = new ArrayList(); // TODO: how do we get delete URIs from the // normalized policy? List delete_uri = new ArrayList(); List replace = new ArrayList(); if (mo != null) { replace.add(mo); delete_uri.add(mo.getUri()); } if (type == EndpointUpdate.UpdateType.ADD_CHANGE) { params.setReplace(replace); } else if (type == EndpointUpdate.UpdateType.DELETE) { params.setDelete_uri(delete_uri); } paramList.add(params); request.setParams(paramList); try { agent.sendRequest(request); } catch (Throwable t) { } } } private void sendEpUpdates(Queue updateQ) { while (!updateQ.isEmpty()) { executor.execute(updateQ.remove()); } } /** * Create an Endpoint Registry Context for an OpFlex * Request message. * * @param agent * @param request * @param dataProvider * @param executor * @return */ public EprContext create(JsonRpcEndpoint agent, RpcMessage message, DataBroker dataProvider, ScheduledExecutorService executor) { EprContext ec = null; if (message instanceof EndpointDeclareRequest) { EndpointDeclareRequest request = (EndpointDeclareRequest)message; /* * There theoretically could be a list of parameters, * but we'll likely only ever see one element. */ ec = new EprContext(agent, request, dataProvider, executor); for (EndpointDeclareRequest.Params params : request.getParams()) { int prr = params.getPrr(); /* * We have a list of endpoints, so iterate through the * list and register each one, extracting the identities * for registration. */ List endpoints = params.getEndpoint(); if (endpoints != null) { for (ManagedObject mo: endpoints) { EprOperation eo = MessageUtils.getEprOpFromEpMo(mo, prr, agent.getIdentifier()); ec.addOperation(eo); } } } } else if (message instanceof EndpointUndeclareRequest) { EndpointUndeclareRequest request = (EndpointUndeclareRequest)message; ec = new EprContext(agent, request, dataProvider, executor); for (EndpointUndeclareRequest.Params params : request.getParams()) { /* * A single URI is provided per param in Undeclare messages */ String subject = params.getSubject(); Uri uri = params.getEndpoint_uri(); if (uri != null) { EprOperation op = MessageUtils.getEprOpFromUri(uri, subject); ec.addOperation(op); } } } else if (message instanceof EndpointResolveRequest) { EndpointResolveRequest request = (EndpointResolveRequest)message; ec = new EprContext(agent, request, dataProvider, executor); for (EndpointResolveRequest.Params params: request.getParams()) { /* * The resolve message contains either the URI * or a context/URI and an identifier. There is only * one of these per param array entry. */ EndpointIdentity eid = params.getEndpoint_ident(); String subject = params.getSubject(); if (eid != null) { EprOperation op = MessageUtils.getEprOpFromEpId(eid, subject); ec.addOperation(op); } else { /* * Extract the list to add the EP to from * the URI */ Uri uri = params.getEndpoint_uri(); if (uri != null) { EprOperation op = MessageUtils.getEprOpFromUri(uri, subject); ec.addOperation(op); } } } } return ec; } private synchronized void addEpSubscription(JsonRpcEndpoint agent, String id) { Set agents = epSubscriptions.get(id); if (agents == null) { agents = Collections.newSetFromMap(new ConcurrentHashMap()); Set result = epSubscriptions.putIfAbsent(id, agents); if (result != null) { agents = result; } } agents.add(agent.getIdentifier()); } private synchronized void removeEpSubscription(JsonRpcEndpoint agent, String id) { Set agents = epSubscriptions.get(id); if (agents != null) { agents.remove(id); } } /** * This notification handles the OpFlex Endpoint messages. * We should only receive request messages. Responses are * sent in a different context, as all requests result * in a Future to access the data store. * * @param agent The JsonRpcEndpoint that received the request * @param request The request message from the endpoint */ @Override public void callback(JsonRpcEndpoint agent, RpcMessage request) { if (messageMap.get(request.getMethod()) == null) { LOG.warn("message {} was not subscribed to, but was delivered.", request); return; } /* * For declaration requests, we need to make sure that this * EP is in our registry. Since we can have multiple identifiers, * we create a Set of endpoints. */ if (request instanceof EndpointDeclareRequest) { EndpointDeclareRequest req = (EndpointDeclareRequest)request; /* * valid() ensures presence of params and MOs, so we know those * won't be null */ if (!req.valid()) { LOG.warn("Invalid declaration request: {}", req); // TODO: should return error reply? return; } /* * OpFlex EP declaration/registration is different from * REST EP declaration/registration -- REST only allows * a single EP to be registered at a time. Since each MO * represents an Endpoint that's being declared, we need * add each one up separately,yet provide a single response. * We also want to know the result of the registration so * we can provide the appropriate response. We create a * context for the Endpoint Registry interaction, where * we can track the status of all the EP registrations, * and provide a response when all have completed. */ EprContext ctx = create(agent, req, dataProvider, executor); ctx.setCallback(this); ctx.createEp(); } else if (request instanceof EndpointUndeclareRequest) { EndpointUndeclareRequest req = (EndpointUndeclareRequest)request; /* * valid() ensures presence of params and URIs, so we know those * won't be null */ if (!req.valid()) { LOG.warn("Invalid declaration request: {}", req); // TODO: should return error reply? return; } /* * OpFlex EP undeclaration/unregistration is different from * REST EP declaration/registration -- REST only allows * a single EP to be unregistered at a time. Since each MO * represents an Endpoint that's being undeclared, we need * add each one up separately,yet provide a single response. * We also want to know the result of the unregistration so * we can provide the appropriate response. We create a * context for the Endpoint Registry interaction, where * we can track the status of all the EP unregistrations, * and provide a response when all have completed. */ EprContext ctx = create(agent, req, dataProvider, executor); ctx.setCallback(this); ctx.deleteEp(); } else if (request instanceof EndpointResolveRequest) { EndpointResolveRequest req = (EndpointResolveRequest)request; if (!req.valid()) { LOG.warn("Invalid endpoint request: {}", req); // TODO: should return error reply? return; } List paramList = req.getParams(); for (EndpointResolveRequest.Params param: paramList) { EprContext ctx = create(agent, req, dataProvider, executor); /* * We query the EPR for the EP. This is an asynchronous * operation, so we send the response in the callback */ ctx.setCallback(this); ctx.lookupEndpoint(); /* * A request is effectively a subscription. Add this agent * to the set of listeners. */ Identity id = null; if (param.getEndpoint_ident() != null) { id = new Identity(param.getEndpoint_ident().getIdentifier()); } else if (param.getEndpoint_uri() != null) { PolicyUri puri = new PolicyUri(param.getEndpoint_uri().getValue()); id = new Identity(puri.pop()); } else { // TOOD: should return error reply return; } addEpSubscription(agent, id.identityAsString()); } } else if (request instanceof EndpointUnresolveRequest) { EndpointUnresolveRequest req = (EndpointUnresolveRequest)request; if (!req.valid()) { LOG.warn("Invalid endpoint request: {}", req); // TODO: should return error reply? return; } List params = ((EndpointUnresolveRequest) request).getParams(); for (EndpointUnresolveRequest.Params param: params) { /* * No interaction with the Data Store is required -- just * cancel the notification subscription for this EP.. */ Identity id = null; if (param.getEndpoint_ident() != null) { id = new Identity(param.getEndpoint_ident().getIdentifier()); } else if (param.getEndpoint_uri() != null) { PolicyUri puri = new PolicyUri(param.getEndpoint_uri().getValue()); id = new Identity(puri.pop()); } else { // TOODO: should return an error return; } removeEpSubscription(agent, id.identityAsString()); } /* * No EprContext is used in unresolve -- so * just send the response directly */ EndpointUnresolveResponse resp = new EndpointUnresolveResponse(); EndpointUnresolveResponse.Result result = new EndpointUnresolveResponse.Result(); resp.setResult(result); resp.setId(req.getId()); try { agent.sendResponse(resp); } catch (Throwable t) { LOG.warn("Response {} could not be sent to {}", resp, agent); } } else { LOG.warn("Unexpected callback, {}", request); } } private class EndpointResponse implements Runnable { private EprContext ctx; private RpcMessage resp; public EndpointResponse(EprContext ctx, RpcMessage resp) { this.ctx = ctx; this.resp = resp; } @Override public void run() { try { ctx.getPeer().sendResponse(resp); } catch (Throwable t) { // TODO: what to do here } } } /** * This notification handles the callback from an interaction * with the Endpoint Registry. The context for the callback * is a notification from the data store, so so the code * has to ensure that it won't block. Responses are sent * using an executor */ @Override public void callback(EprContext ctx) { RpcMessage resp = null; if (ctx.getRequest() == null) return; if (!(ctx.getRequest() instanceof EndpointDeclareRequest) && !(ctx.getRequest() instanceof EndpointUndeclareRequest) && !(ctx.getRequest() instanceof EndpointResolveRequest)) { return; } if (ctx.getRequest() instanceof EndpointDeclareRequest) { EndpointDeclareRequest req = (EndpointDeclareRequest)ctx.getRequest(); EndpointDeclareResponse response = new EndpointDeclareResponse(); EndpointDeclareResponse.Result result = new EndpointDeclareResponse.Result(); response.setResult(result); response.setId(req.getId()); response.setError(null); // TODO: real errors resp = response; } else if (ctx.getRequest() instanceof EndpointUndeclareRequest) { EndpointUndeclareRequest req = (EndpointUndeclareRequest)ctx.getRequest(); EndpointUndeclareResponse response = new EndpointUndeclareResponse(); EndpointUndeclareResponse.Result result = new EndpointUndeclareResponse.Result(); response.setResult(result); response.setId(req.getId()); response.setError(null); // TODO: real errors resp = response; } else { EndpointResolveRequest req = (EndpointResolveRequest)ctx.getRequest(); EndpointResolveResponse response = new EndpointResolveResponse(); response.setId(req.getId()); EndpointResolveResponse.Result result = new EndpointResolveResponse.Result(); List epList = new ArrayList(); if (ctx.getOperations().size() > 0) { /* * If we get any EP, then we can * provide a response to the original request * Note that we could potentially have multiple * requests outstanding for the same EP, and * even using different context types (L2 or L3). */ for (EprOperation op: ctx.getOperations()) { ManagedObject mo = MessageUtils.getMoFromOp(op); if (mo != null) { epList.add(mo); } /* * For EPs on a different agent, we need to look up the * VTEP information. For now, we're only supporting * VXLAN VTEPs, so we look up the destination tunnel IP, * and provide that in the data field of the response */ // TODO: Need to look this up in op store //endpoint.setData(); } result.setEndpoint(epList); response.setResult(result); resp = response; } } if (resp != null) { executor.execute(new EndpointResponse(ctx, resp)); } } }