2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.opflex;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
15 import java.util.Map.Entry;
16 import java.util.Queue;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.ScheduledExecutorService;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
28 import org.opendaylight.groupbasedpolicy.endpoint.AbstractEndpointRegistry;
29 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
30 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
31 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
32 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
33 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexAgent;
35 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexConnectionService;
36 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.Role;
37 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointDeclareRequest;
38 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointDeclareResponse;
39 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointIdentity;
40 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointResolveResponse;
41 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUndeclareRequest;
42 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUndeclareResponse;
43 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUnresolveRequest;
44 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUnresolveResponse;
45 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUpdateRequest;
46 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointResolveRequest;
47 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.ManagedObject;
48 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.MitLib;
49 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.PolicyUri;
50 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContext;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextL3;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextL3Builder;
62 import org.opendaylight.yangtools.concepts.ListenerRegistration;
63 import org.opendaylight.yangtools.yang.binding.DataObject;
64 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
69 * Keep track of endpoints on the system. Maintain an index of endpoints
70 * and their locations for queries from agents. The endpoint manager will maintain
71 * appropriate indexes only for agents that are attached to the current
74 * In order to render the policy, we need to be able to efficiently enumerate
75 * all endpoints on a particular agent and also all the agents containing
76 * each particular endpoint group
79 public class EndpointManager
80 extends AbstractEndpointRegistry
81 implements AutoCloseable, DataChangeListener, RpcBroker.RpcCallback,
82 EprContext.EprCtxCallback {
83 protected static final Logger LOG =
84 LoggerFactory.getLogger(EndpointManager.class);
86 private static final InstanceIdentifier<Endpoint> endpointsIid =
87 InstanceIdentifier.builder(Endpoints.class)
88 .child(Endpoint.class).build();
89 private static final InstanceIdentifier<EndpointL3> endpointsL3Iid =
90 InstanceIdentifier.builder(Endpoints.class)
91 .child(EndpointL3.class).build();
93 final ListenerRegistration<DataChangeListener> listenerReg;
94 final ListenerRegistration<DataChangeListener> listenerL3Reg;
96 private final OpflexConnectionService connectionService;
97 private final MitLib mitLibrary;
99 final ConcurrentHashMap<EpKey, Endpoint> endpoints;
101 private ConcurrentHashMap<String, Set<String>> epSubscriptions;
102 private RpcMessageMap messageMap = null;
104 public EndpointManager(DataBroker dataProvider,
105 RpcProviderRegistry rpcRegistry,
106 ScheduledExecutorService executor,
107 OpflexConnectionService connectionService,
108 MitLib opflexLibrary) {
109 super(dataProvider, rpcRegistry, executor);
111 if (dataProvider != null) {
112 listenerReg = dataProvider
113 .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
116 DataChangeScope.ONE);
117 listenerL3Reg = dataProvider
118 .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
121 DataChangeScope.ONE);
124 listenerL3Reg = null;
127 this.connectionService = connectionService;
128 this.mitLibrary = opflexLibrary;
130 endpoints = new ConcurrentHashMap<EpKey, Endpoint>();
131 epSubscriptions = new ConcurrentHashMap<String, Set<String>>();
133 /* Subscribe to EPR messages */
134 messageMap = new RpcMessageMap();
135 List<RpcMessage> messages = Role.ENDPOINT_REGISTRY.getMessages();
136 messageMap.addList(messages);
137 for (RpcMessage msg: messages) {
138 this.connectionService.subscribe(msg, this);
140 LOG.trace("Initialized OpFlex endpoint manager");
144 * Shut down the {@link EndpointManager}
147 public void shutdown() {
156 * Get the endpoint object for the given key
157 * @param epKey the key
158 * @return the {@link Endpoint} corresponding to the key
160 public Endpoint getEndpoint(EpKey epKey) {
161 return endpoints.get(epKey);
164 // ************************
165 // AbstractEndpointRegistry
166 // ************************
169 protected EndpointBuilder buildEndpoint(RegisterEndpointInput input) {
170 OpflexOverlayContextInput ictx =
171 input.getAugmentation(OpflexOverlayContextInput.class);
172 return super.buildEndpoint(input)
173 .addAugmentation(OpflexOverlayContext.class,
174 new OpflexOverlayContextBuilder(ictx).build());
178 protected EndpointL3Builder buildEndpointL3(RegisterEndpointInput input) {
179 OpflexOverlayContextInput ictx =
180 input.getAugmentation(OpflexOverlayContextInput.class);
181 return super.buildEndpointL3(input)
182 .addAugmentation(OpflexOverlayContextL3.class,
183 new OpflexOverlayContextL3Builder(ictx).build());
191 public void close() throws Exception {
192 if (listenerReg != null) listenerReg.close();
196 // ******************
197 // DataChangeListener
198 // ******************
201 public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
202 for (DataObject dao : change.getCreatedData().values()) {
203 if (dao instanceof Endpoint)
204 updateEndpoint(null, dao);
206 for (InstanceIdentifier<?> iid : change.getRemovedPaths()) {
207 DataObject old = change.getOriginalData().get(iid);
208 if (old != null && old instanceof Endpoint)
209 updateEndpoint(old, null);
211 Map<InstanceIdentifier<?>,DataObject> d = change.getUpdatedData();
212 for (Entry<InstanceIdentifier<?>, DataObject> entry : d.entrySet()) {
213 if ((!(entry.getValue() instanceof Endpoint)) &&
214 (!(entry.getValue() instanceof EndpointL3))) continue;
215 DataObject old = change.getOriginalData().get(entry.getKey());
216 DataObject oldEp = null;
217 if (entry instanceof Endpoint ||
218 entry instanceof EndpointL3) {
219 if (old != null && old instanceof Endpoint)
221 updateEndpoint(oldEp, entry.getValue());
229 private Identity getIdentity(DataObject obj) {
231 if (obj instanceof Endpoint) {
232 Endpoint ep = (Endpoint)obj;
233 id = new Identity(ep);
234 id.setContext(ep.getL2Context().getValue());
237 if (obj instanceof EndpointL3) {
238 EndpointL3 ep = (EndpointL3)obj;
239 id = new Identity(ep);
240 id.setContext(ep.getL3Context().getValue());
242 if (id != null && !id.valid()) {
248 private synchronized Set<String> getEpSubscriptions(String id) {
249 return epSubscriptions.get(id);
253 * Provide endpoint policy update messages based on changes
255 protected void updateEndpoint(DataObject oldEp, DataObject newEp) {
256 Identity oldId = getIdentity(oldEp);
257 Identity newId = getIdentity(newEp);
259 * If an endpoint has changed, we need to provide notifications
260 * to agents that have subscribed to that endpoint. Batch up
261 * the notifications to be sent to the agents.
263 Queue<EndpointUpdate> updateQ = new ConcurrentLinkedQueue<EndpointUpdate>();
265 /* This covers additions or updates */
267 Set<String> agentList = getEpSubscriptions(newId.identityAsString());
268 if (agentList != null) {
269 for (String agentId : agentList) {
270 OpflexAgent agent = connectionService.getOpflexAgent(agentId);
272 updateQ.add(new EndpointUpdate(EndpointUpdate.UpdateType.ADD_CHANGE,
279 /* this covers deletions */
280 if ((newId == null) && (oldId != null)) {
281 Set<String> agentList = getEpSubscriptions(oldId.identityAsString());
282 if (agentList != null) {
283 for (String agentId : agentList) {
284 OpflexAgent agent = connectionService.getOpflexAgent(agentId);
286 updateQ.add(new EndpointUpdate(EndpointUpdate.UpdateType.DELETE,
294 sendEpUpdates(updateQ);
297 private static class EndpointUpdate implements Runnable {
298 public static enum UpdateType {
299 ADD_CHANGE("add_change"),
302 private String updateType;
304 UpdateType(String updateType) {
305 this.updateType = updateType;
309 public String toString() {
310 return this.updateType;
314 private final UpdateType type;
315 private final JsonRpcEndpoint agent;
316 private final ManagedObject mo;
317 EndpointUpdate(UpdateType type, JsonRpcEndpoint agent, DataObject obj) {
320 mo = MessageUtils.getMoFromEp(obj);
325 EndpointUpdateRequest request =
326 new EndpointUpdateRequest();
327 EndpointUpdateRequest.Params params =
328 new EndpointUpdateRequest.Params();
329 List<EndpointUpdateRequest.Params> paramList =
330 new ArrayList<EndpointUpdateRequest.Params>();
332 // TODO: how do we get delete URIs from the
333 // normalized policy?
334 List<Uri> delete_uri = new ArrayList<Uri>();
335 List<ManagedObject> replace = new ArrayList<ManagedObject>();
338 delete_uri.add(mo.getUri());
340 if (type == EndpointUpdate.UpdateType.ADD_CHANGE) {
341 params.setReplace(replace);
343 else if (type == EndpointUpdate.UpdateType.DELETE) {
344 params.setDelete_uri(delete_uri);
347 paramList.add(params);
348 request.setParams(paramList);
350 agent.sendRequest(request);
352 catch (Throwable t) {
360 private void sendEpUpdates(Queue<EndpointUpdate> updateQ) {
361 while (!updateQ.isEmpty()) {
362 executor.execute(updateQ.remove());
367 * Create an Endpoint Registry Context for an OpFlex
372 * @param dataProvider
376 public EprContext create(JsonRpcEndpoint agent,
378 DataBroker dataProvider,
379 ScheduledExecutorService executor) {
381 EprContext ec = null;
383 if (message instanceof EndpointDeclareRequest) {
384 EndpointDeclareRequest request = (EndpointDeclareRequest)message;
386 * There theoretically could be a list of parameters,
387 * but we'll likely only ever see one element.
389 ec = new EprContext(agent, request, dataProvider, executor);
390 for (EndpointDeclareRequest.Params params : request.getParams()) {
392 int prr = params.getPrr();
395 * We have a list of endpoints, so iterate through the
396 * list and register each one, extracting the identities
399 List<ManagedObject> endpoints = params.getEndpoint();
400 if (endpoints != null) {
401 for (ManagedObject mo: endpoints) {
403 MessageUtils.getEprOpFromEpMo(mo, prr, agent.getIdentifier());
409 else if (message instanceof EndpointUndeclareRequest) {
410 EndpointUndeclareRequest request = (EndpointUndeclareRequest)message;
411 ec = new EprContext(agent, request, dataProvider, executor);
412 for (EndpointUndeclareRequest.Params params : request.getParams()) {
415 * A single URI is provided per param in Undeclare messages
417 String subject = params.getSubject();
418 Uri uri = params.getEndpoint_uri();
420 EprOperation op = MessageUtils.getEprOpFromUri(uri, subject);
425 else if (message instanceof EndpointResolveRequest) {
426 EndpointResolveRequest request = (EndpointResolveRequest)message;
427 ec = new EprContext(agent, request, dataProvider, executor);
428 for (EndpointResolveRequest.Params params: request.getParams()) {
430 * The resolve message contains either the URI
431 * or a context/URI and an identifier. There is only
432 * one of these per param array entry.
434 EndpointIdentity eid = params.getEndpoint_ident();
436 String subject = params.getSubject();
440 EprOperation op = MessageUtils.getEprOpFromEpId(eid, subject);
445 * Extract the list to add the EP to from
448 Uri uri = params.getEndpoint_uri();
450 EprOperation op = MessageUtils.getEprOpFromUri(uri, subject);
459 private synchronized void addEpSubscription(JsonRpcEndpoint agent, String id) {
460 Set<String> agents = epSubscriptions.get(id);
461 if (agents == null) {
462 agents = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
463 Set<String> result = epSubscriptions.putIfAbsent(id, agents);
464 if (result != null) {
468 agents.add(agent.getIdentifier());
471 private synchronized void removeEpSubscription(JsonRpcEndpoint agent, String id) {
472 Set<String> agents = epSubscriptions.get(id);
473 if (agents != null) {
479 * This notification handles the OpFlex Endpoint messages.
480 * We should only receive request messages. Responses are
481 * sent in a different context, as all requests result
482 * in a Future to access the data store.
484 * @param agent The JsonRpcEndpoint that received the request
485 * @param request The request message from the endpoint
488 public void callback(JsonRpcEndpoint agent, RpcMessage request) {
490 if (messageMap.get(request.getMethod()) == null) {
491 LOG.warn("message {} was not subscribed to, but was delivered.", request);
496 * For declaration requests, we need to make sure that this
497 * EP is in our registry. Since we can have multiple identifiers,
498 * we create a Set of endpoints.
501 if (request instanceof EndpointDeclareRequest) {
502 EndpointDeclareRequest req = (EndpointDeclareRequest)request;
506 * valid() ensures presence of params and MOs, so we know those
510 LOG.warn("Invalid declaration request: {}", req);
511 // TODO: should return error reply?
516 * OpFlex EP declaration/registration is different from
517 * REST EP declaration/registration -- REST only allows
518 * a single EP to be registered at a time. Since each MO
519 * represents an Endpoint that's being declared, we need
520 * add each one up separately,yet provide a single response.
521 * We also want to know the result of the registration so
522 * we can provide the appropriate response. We create a
523 * context for the Endpoint Registry interaction, where
524 * we can track the status of all the EP registrations,
525 * and provide a response when all have completed.
527 EprContext ctx = create(agent, req, dataProvider, executor);
528 ctx.setCallback(this);
531 else if (request instanceof EndpointUndeclareRequest) {
532 EndpointUndeclareRequest req = (EndpointUndeclareRequest)request;
535 * valid() ensures presence of params and URIs, so we know those
539 LOG.warn("Invalid declaration request: {}", req);
540 // TODO: should return error reply?
545 * OpFlex EP undeclaration/unregistration is different from
546 * REST EP declaration/registration -- REST only allows
547 * a single EP to be unregistered at a time. Since each MO
548 * represents an Endpoint that's being undeclared, we need
549 * add each one up separately,yet provide a single response.
550 * We also want to know the result of the unregistration so
551 * we can provide the appropriate response. We create a
552 * context for the Endpoint Registry interaction, where
553 * we can track the status of all the EP unregistrations,
554 * and provide a response when all have completed.
556 EprContext ctx = create(agent, req, dataProvider, executor);
557 ctx.setCallback(this);
560 else if (request instanceof EndpointResolveRequest) {
561 EndpointResolveRequest req = (EndpointResolveRequest)request;
564 LOG.warn("Invalid endpoint request: {}", req);
565 // TODO: should return error reply?
568 List<EndpointResolveRequest.Params> paramList =
571 for (EndpointResolveRequest.Params param: paramList) {
572 EprContext ctx = create(agent, req, dataProvider, executor);
575 * We query the EPR for the EP. This is an asynchronous
576 * operation, so we send the response in the callback
578 ctx.setCallback(this);
579 ctx.lookupEndpoint();
582 * A request is effectively a subscription. Add this agent
583 * to the set of listeners.
586 if (param.getEndpoint_ident() != null) {
587 id = new Identity(param.getEndpoint_ident().getIdentifier());
589 else if (param.getEndpoint_uri() != null) {
590 PolicyUri puri = new PolicyUri(param.getEndpoint_uri().getValue());
591 id = new Identity(puri.pop());
594 // TOOD: should return error reply
597 addEpSubscription(agent, id.identityAsString());
600 else if (request instanceof EndpointUnresolveRequest) {
601 EndpointUnresolveRequest req = (EndpointUnresolveRequest)request;
604 LOG.warn("Invalid endpoint request: {}", req);
605 // TODO: should return error reply?
609 List<EndpointUnresolveRequest.Params> params =
610 ((EndpointUnresolveRequest) request).getParams();
612 for (EndpointUnresolveRequest.Params param: params) {
614 * No interaction with the Data Store is required -- just
615 * cancel the notification subscription for this EP..
618 if (param.getEndpoint_ident() != null) {
619 id = new Identity(param.getEndpoint_ident().getIdentifier());
621 else if (param.getEndpoint_uri() != null) {
622 PolicyUri puri = new PolicyUri(param.getEndpoint_uri().getValue());
623 id = new Identity(puri.pop());
626 // TOODO: should return an error
629 removeEpSubscription(agent, id.identityAsString());
633 * No EprContext is used in unresolve -- so
634 * just send the response directly
636 EndpointUnresolveResponse resp =
637 new EndpointUnresolveResponse();
638 EndpointUnresolveResponse.Result result =
639 new EndpointUnresolveResponse.Result();
640 resp.setResult(result);
641 resp.setId(req.getId());
643 agent.sendResponse(resp);
645 catch (Throwable t) {
646 LOG.warn("Response {} could not be sent to {}", resp, agent);
651 LOG.warn("Unexpected callback, {}", request);
656 private class EndpointResponse implements Runnable {
658 private EprContext ctx;
659 private RpcMessage resp;
660 public EndpointResponse(EprContext ctx, RpcMessage resp) {
668 ctx.getPeer().sendResponse(resp);
669 } catch (Throwable t) {
670 // TODO: what to do here
678 * This notification handles the callback from an interaction
679 * with the Endpoint Registry. The context for the callback
680 * is a notification from the data store, so so the code
681 * has to ensure that it won't block. Responses are sent
685 public void callback(EprContext ctx) {
686 RpcMessage resp = null;
687 if (ctx.getRequest() == null) return;
689 if (!(ctx.getRequest() instanceof EndpointDeclareRequest) &&
690 !(ctx.getRequest() instanceof EndpointUndeclareRequest) &&
691 !(ctx.getRequest() instanceof EndpointResolveRequest)) {
695 if (ctx.getRequest() instanceof EndpointDeclareRequest) {
696 EndpointDeclareRequest req =
697 (EndpointDeclareRequest)ctx.getRequest();
698 EndpointDeclareResponse response = new EndpointDeclareResponse();
699 EndpointDeclareResponse.Result result =
700 new EndpointDeclareResponse.Result();
701 response.setResult(result);
702 response.setId(req.getId());
703 response.setError(null); // TODO: real errors
706 else if (ctx.getRequest() instanceof EndpointUndeclareRequest) {
707 EndpointUndeclareRequest req =
708 (EndpointUndeclareRequest)ctx.getRequest();
709 EndpointUndeclareResponse response = new EndpointUndeclareResponse();
710 EndpointUndeclareResponse.Result result =
711 new EndpointUndeclareResponse.Result();
712 response.setResult(result);
713 response.setId(req.getId());
714 response.setError(null); // TODO: real errors
718 EndpointResolveRequest req =
719 (EndpointResolveRequest)ctx.getRequest();
720 EndpointResolveResponse response = new EndpointResolveResponse();
721 response.setId(req.getId());
722 EndpointResolveResponse.Result result =
723 new EndpointResolveResponse.Result();
724 List<ManagedObject> epList =
725 new ArrayList<ManagedObject>();
728 if (ctx.getOperations().size() > 0) {
731 * If we get any EP, then we can
732 * provide a response to the original request
733 * Note that we could potentially have multiple
734 * requests outstanding for the same EP, and
735 * even using different context types (L2 or L3).
737 for (EprOperation op: ctx.getOperations()) {
739 ManagedObject mo = MessageUtils.getMoFromOp(op);
744 * For EPs on a different agent, we need to look up the
745 * VTEP information. For now, we're only supporting
746 * VXLAN VTEPs, so we look up the destination tunnel IP,
747 * and provide that in the data field of the response
749 // TODO: Need to look this up in op store
750 //endpoint.setData();
752 result.setEndpoint(epList);
753 response.setResult(result);
758 executor.execute(new EndpointResponse(ctx, resp));