Feature uses features-parent as parent
[groupbasedpolicy.git] / renderers / opflex / src / main / java / org / opendaylight / groupbasedpolicy / renderer / opflex / EndpointManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.opflex;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Map.Entry;
16 import java.util.Queue;
17 import java.util.Set;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.ScheduledExecutorService;
21
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
28 import org.opendaylight.groupbasedpolicy.endpoint.EndpointRpcRegistry;
29 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
30 import org.opendaylight.groupbasedpolicy.endpoint.EpRendererAugmentation;
31 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.JsonRpcEndpoint;
32 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcBroker;
33 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessage;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessageMap;
35 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexAgent;
36 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexConnectionService;
37 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.Role;
38 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointDeclareRequest;
39 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointDeclareResponse;
40 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointIdentity;
41 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointResolveResponse;
42 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUndeclareRequest;
43 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUndeclareResponse;
44 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUnresolveRequest;
45 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUnresolveResponse;
46 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUpdateRequest;
47 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointResolveRequest;
48 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.ManagedObject;
49 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.MitLib;
50 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.PolicyUri;
51 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterL3PrefixEndpointInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3PrefixBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContext;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextInput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextL3;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextL3Builder;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.yang.binding.Augmentation;
67 import org.opendaylight.yangtools.yang.binding.DataObject;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 /**
73  * Keep track of endpoints on the system. Maintain an index of endpoints and
74  * their locations for queries from agents. The endpoint manager will maintain
75  * appropriate indexes only for agents that are attached to the current
76  * controller node. In order to render the policy, we need to be able to
77  * efficiently enumerate all endpoints on a particular agent and also all the
78  * agents containing each particular endpoint group
79  *
80  */
81 public class EndpointManager implements AutoCloseable, DataChangeListener, RpcBroker.RpcCallback,
82         EprContext.EprCtxCallback {
83
84     protected static final Logger LOG = LoggerFactory.getLogger(EndpointManager.class);
85
86     private static final InstanceIdentifier<Endpoint> endpointsIid = InstanceIdentifier.builder(Endpoints.class)
87             .child(Endpoint.class)
88             .build();
89     private static final InstanceIdentifier<EndpointL3> endpointsL3Iid = InstanceIdentifier.builder(Endpoints.class)
90             .child(EndpointL3.class)
91             .build();
92
93     final ListenerRegistration<DataChangeListener> listenerReg;
94     final ListenerRegistration<DataChangeListener> listenerL3Reg;
95
96     private final OpflexConnectionService connectionService;
97     private final MitLib mitLibrary;
98
99     final Map<EpKey, Endpoint> endpoints = new ConcurrentHashMap<>();
100
101     private final ConcurrentHashMap<String, Set<String>> epSubscriptions = new ConcurrentHashMap<>();
102     private RpcMessageMap messageMap = null;
103
104     final private OfEndpointAug endpointRpcAug = new OfEndpointAug();
105
106     final private ScheduledExecutorService executor;
107
108     final private DataBroker dataProvider;
109
110     public EndpointManager(DataBroker dataProvider, RpcProviderRegistry rpcRegistry, ScheduledExecutorService executor,
111             OpflexConnectionService connectionService, MitLib opflexLibrary) {
112         this.executor = executor;
113         this.dataProvider = dataProvider;
114         EndpointRpcRegistry.register(dataProvider, rpcRegistry, endpointRpcAug);
115
116         if (dataProvider != null) {
117             listenerReg = dataProvider.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, endpointsIid, this,
118                     DataChangeScope.ONE);
119             listenerL3Reg = dataProvider.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, endpointsL3Iid,
120                     this, DataChangeScope.ONE);
121         } else {
122             listenerReg = null;
123             listenerL3Reg = null;
124         }
125
126         this.connectionService = connectionService;
127         this.mitLibrary = opflexLibrary;
128
129         /* Subscribe to EPR messages */
130         messageMap = new RpcMessageMap();
131         List<RpcMessage> messages = Role.ENDPOINT_REGISTRY.getMessages();
132         messageMap.addList(messages);
133         for (RpcMessage msg : messages) {
134             this.connectionService.subscribe(msg, this);
135         }
136         LOG.trace("Initialized OpFlex endpoint manager");
137     }
138
139     /**
140      * Shut down the {@link EndpointManager}
141      */
142     public void shutdown() {
143
144     }
145
146     // ***************
147     // EndpointManager
148     // ***************
149
150     /**
151      * Get the endpoint object for the given key
152      *
153      * @param epKey
154      *            the key
155      * @return the {@link Endpoint} corresponding to the key
156      */
157     public Endpoint getEndpoint(EpKey epKey) {
158         return endpoints.get(epKey);
159     }
160
161     // ************************
162     // Endpoint Augmentation
163     // ************************
164     private class OfEndpointAug implements EpRendererAugmentation {
165
166         @Override
167         public Augmentation<Endpoint> buildEndpointAugmentation(RegisterEndpointInput input) {
168             OpflexOverlayContextInput ictx = input.getAugmentation(OpflexOverlayContextInput.class);
169             if (ictx != null)
170                 return new OpflexOverlayContextBuilder(ictx).build();
171             return null;
172         }
173
174         @Override
175         public Augmentation<EndpointL3> buildEndpointL3Augmentation(RegisterEndpointInput input) {
176             OpflexOverlayContextInput ictx = input.getAugmentation(OpflexOverlayContextInput.class);
177             if (ictx != null)
178                 return new OpflexOverlayContextL3Builder(ictx).build();
179             return null;
180         }
181
182         @Override
183         public void buildL3PrefixEndpointAugmentation(EndpointL3PrefixBuilder eb, RegisterL3PrefixEndpointInput input) {
184             // TODO These methods will be replaced by getAugmentation and
185             // augmentation applied at caller.
186
187         }
188     }
189
190     // *************
191     // AutoCloseable
192     // *************
193
194     @Override
195     public void close() throws Exception {
196         if (listenerReg != null)
197             listenerReg.close();
198         EndpointRpcRegistry.unregister(endpointRpcAug);
199     }
200
201     // ******************
202     // DataChangeListener
203     // ******************
204
205     @Override
206     public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
207         for (DataObject dao : change.getCreatedData().values()) {
208             if (dao instanceof Endpoint)
209                 updateEndpoint(null, dao);
210         }
211         for (InstanceIdentifier<?> iid : change.getRemovedPaths()) {
212             DataObject old = change.getOriginalData().get(iid);
213             if (old != null && old instanceof Endpoint)
214                 updateEndpoint(old, null);
215         }
216         Map<InstanceIdentifier<?>, DataObject> d = change.getUpdatedData();
217         for (Entry<InstanceIdentifier<?>, DataObject> entry : d.entrySet()) {
218             if ((!(entry.getValue() instanceof Endpoint)) && (!(entry.getValue() instanceof EndpointL3)))
219                 continue;
220             DataObject old = change.getOriginalData().get(entry.getKey());
221             DataObject oldEp = null;
222             if (entry instanceof Endpoint || entry instanceof EndpointL3) {
223                 if (old != null && old instanceof Endpoint)
224                     oldEp = old;
225                 updateEndpoint(oldEp, entry.getValue());
226             }
227         }
228     }
229
230     // **************
231     // Implementation
232     // **************
233
234     private Identity getIdentity(DataObject obj) {
235         Identity id = null;
236         if (obj instanceof Endpoint) {
237             Endpoint ep = (Endpoint) obj;
238             id = new Identity(ep);
239             id.setContext(ep.getL2Context().getValue());
240         }
241
242         if (obj instanceof EndpointL3) {
243             EndpointL3 ep = (EndpointL3) obj;
244             id = new Identity(ep);
245             id.setContext(ep.getL3Context().getValue());
246         }
247         if (id != null && !id.valid()) {
248             return null;
249         }
250         return id;
251     }
252
253     private synchronized Set<String> getEpSubscriptions(String id) {
254         return epSubscriptions.get(id);
255     }
256
257     /**
258      * Provide endpoint policy update messages based on changes
259      */
260     protected void updateEndpoint(DataObject oldEp, DataObject newEp) {
261         Identity oldId = getIdentity(oldEp);
262         Identity newId = getIdentity(newEp);
263         /*
264          * If an endpoint has changed, we need to provide notifications to
265          * agents that have subscribed to that endpoint. Batch up the
266          * notifications to be sent to the agents.
267          */
268         Queue<EndpointUpdate> updateQ = new ConcurrentLinkedQueue<EndpointUpdate>();
269
270         /* This covers additions or updates */
271         if (newId != null) {
272             Set<String> agentList = getEpSubscriptions(newId.identityAsString());
273             if (agentList != null) {
274                 for (String agentId : agentList) {
275                     OpflexAgent agent = connectionService.getOpflexAgent(agentId);
276                     if (agent != null) {
277                         updateQ.add(new EndpointUpdate(EndpointUpdate.UpdateType.ADD_CHANGE, agent.getEndpoint(), newEp));
278                     }
279                 }
280             }
281         }
282         /* this covers deletions */
283         if ((newId == null) && (oldId != null)) {
284             Set<String> agentList = getEpSubscriptions(oldId.identityAsString());
285             if (agentList != null) {
286                 for (String agentId : agentList) {
287                     OpflexAgent agent = connectionService.getOpflexAgent(agentId);
288                     if (agent != null) {
289                         updateQ.add(new EndpointUpdate(EndpointUpdate.UpdateType.DELETE, agent.getEndpoint(), oldEp));
290                     }
291                 }
292             }
293         }
294
295         sendEpUpdates(updateQ);
296     }
297
298     private static class EndpointUpdate implements Runnable {
299
300         public static enum UpdateType {
301             ADD_CHANGE("add_change"), DELETE("delete");
302
303             private final String updateType;
304
305             UpdateType(String updateType) {
306                 this.updateType = updateType;
307             }
308
309             @Override
310             public String toString() {
311                 return this.updateType;
312             }
313         }
314
315         private final UpdateType type;
316         private final JsonRpcEndpoint agent;
317         private final ManagedObject mo;
318
319         EndpointUpdate(UpdateType type, JsonRpcEndpoint agent, DataObject obj) {
320             this.type = type;
321             this.agent = agent;
322             mo = MessageUtils.getMoFromEp(obj);
323         }
324
325         @Override
326         public void run() {
327             EndpointUpdateRequest request = new EndpointUpdateRequest();
328             EndpointUpdateRequest.Params params = new EndpointUpdateRequest.Params();
329             List<EndpointUpdateRequest.Params> paramList = new ArrayList<EndpointUpdateRequest.Params>();
330
331             // TODO: how do we get delete URIs from the
332             // normalized policy?
333             List<Uri> delete_uri = new ArrayList<Uri>();
334             List<ManagedObject> replace = new ArrayList<ManagedObject>();
335             if (mo != null) {
336                 replace.add(mo);
337                 delete_uri.add(mo.getUri());
338             }
339             if (type == EndpointUpdate.UpdateType.ADD_CHANGE) {
340                 params.setReplace(replace);
341             } else if (type == EndpointUpdate.UpdateType.DELETE) {
342                 params.setDelete_uri(delete_uri);
343             }
344
345             paramList.add(params);
346             request.setParams(paramList);
347             try {
348                 agent.sendRequest(request);
349             } catch (Exception e) {
350
351             }
352
353         }
354
355     }
356
357     private void sendEpUpdates(Queue<EndpointUpdate> updateQ) {
358         while (!updateQ.isEmpty()) {
359             executor.execute(updateQ.remove());
360         }
361     }
362
363     /**
364      * Create an Endpoint Registry Context for an OpFlex Request message.
365      *
366      * @param agent
367      * @param message
368      * @param dataProvider
369      * @param executor
370      * @return
371      */
372     public EprContext create(JsonRpcEndpoint agent, RpcMessage message, DataBroker dataProvider,
373             ScheduledExecutorService executor) {
374
375         EprContext ec = null;
376
377         if (message instanceof EndpointDeclareRequest) {
378             EndpointDeclareRequest request = (EndpointDeclareRequest) message;
379             /*
380              * There theoretically could be a list of parameters, but we'll
381              * likely only ever see one element.
382              */
383             ec = new EprContext(agent, request, dataProvider, executor);
384             for (EndpointDeclareRequest.Params params : request.getParams()) {
385
386                 int prr = params.getPrr();
387
388                 /*
389                  * We have a list of endpoints, so iterate through the list and
390                  * register each one, extracting the identities for
391                  * registration.
392                  */
393                 List<ManagedObject> endpoints = params.getEndpoint();
394                 if (endpoints != null) {
395                     for (ManagedObject mo : endpoints) {
396                         EprOperation eo = MessageUtils.getEprOpFromEpMo(mo, prr, agent.getIdentifier());
397                         ec.addOperation(eo);
398                     }
399                 }
400             }
401         } else if (message instanceof EndpointUndeclareRequest) {
402             EndpointUndeclareRequest request = (EndpointUndeclareRequest) message;
403             ec = new EprContext(agent, request, dataProvider, executor);
404             for (EndpointUndeclareRequest.Params params : request.getParams()) {
405
406                 /*
407                  * A single URI is provided per param in Undeclare messages
408                  */
409                 String subject = params.getSubject();
410                 Uri uri = params.getEndpoint_uri();
411                 if (uri != null) {
412                     EprOperation op = MessageUtils.getEprOpFromUri(uri, subject);
413                     ec.addOperation(op);
414                 }
415             }
416         } else if (message instanceof EndpointResolveRequest) {
417             EndpointResolveRequest request = (EndpointResolveRequest) message;
418             ec = new EprContext(agent, request, dataProvider, executor);
419             for (EndpointResolveRequest.Params params : request.getParams()) {
420                 /*
421                  * The resolve message contains either the URI or a context/URI
422                  * and an identifier. There is only one of these per param array
423                  * entry.
424                  */
425                 EndpointIdentity eid = params.getEndpoint_ident();
426
427                 String subject = params.getSubject();
428
429                 if (eid != null) {
430
431                     EprOperation op = MessageUtils.getEprOpFromEpId(eid, subject);
432                     ec.addOperation(op);
433
434                 } else {
435                     /*
436                      * Extract the list to add the EP to from the URI
437                      */
438                     Uri uri = params.getEndpoint_uri();
439                     if (uri != null) {
440                         EprOperation op = MessageUtils.getEprOpFromUri(uri, subject);
441                         ec.addOperation(op);
442                     }
443                 }
444             }
445         }
446         return ec;
447     }
448
449     private synchronized void addEpSubscription(JsonRpcEndpoint agent, String id) {
450         Set<String> agents = epSubscriptions.get(id);
451         if (agents == null) {
452             agents = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
453             Set<String> result = epSubscriptions.putIfAbsent(id, agents);
454             if (result != null) {
455                 agents = result;
456             }
457         }
458         agents.add(agent.getIdentifier());
459     }
460
461     private synchronized void removeEpSubscription(JsonRpcEndpoint agent, String id) {
462         Set<String> agents = epSubscriptions.get(id);
463         if (agents != null) {
464             agents.remove(id);
465         }
466     }
467
468     /**
469      * This notification handles the OpFlex Endpoint messages. We should only
470      * receive request messages. Responses are sent in a different context, as
471      * all requests result in a Future to access the data store.
472      *
473      * @param agent
474      *            The JsonRpcEndpoint that received the request
475      * @param request
476      *            The request message from the endpoint
477      */
478     @Override
479     public void callback(JsonRpcEndpoint agent, RpcMessage request) {
480
481         if (messageMap.get(request.getMethod()) == null) {
482             LOG.warn("message {} was not subscribed to, but was delivered.", request);
483             return;
484         }
485
486         /*
487          * For declaration requests, we need to make sure that this EP is in our
488          * registry. Since we can have multiple identifiers, we create a Set of
489          * endpoints.
490          */
491
492         if (request instanceof EndpointDeclareRequest) {
493             EndpointDeclareRequest req = (EndpointDeclareRequest) request;
494
495             /*
496              * valid() ensures presence of params and MOs, so we know those
497              * won't be null
498              */
499             if (!req.valid()) {
500                 LOG.warn("Invalid declaration request: {}", req);
501                 // TODO: should return error reply?
502                 return;
503             }
504
505             /*
506              * OpFlex EP declaration/registration is different from REST EP
507              * declaration/registration -- REST only allows a single EP to be
508              * registered at a time. Since each MO represents an Endpoint that's
509              * being declared, we need add each one up separately,yet provide a
510              * single response. We also want to know the result of the
511              * registration so we can provide the appropriate response. We
512              * create a context for the Endpoint Registry interaction, where we
513              * can track the status of all the EP registrations, and provide a
514              * response when all have completed.
515              */
516             EprContext ctx = create(agent, req, dataProvider, executor);
517             ctx.setCallback(this);
518             ctx.createEp();
519         } else if (request instanceof EndpointUndeclareRequest) {
520             EndpointUndeclareRequest req = (EndpointUndeclareRequest) request;
521
522             /*
523              * valid() ensures presence of params and URIs, so we know those
524              * won't be null
525              */
526             if (!req.valid()) {
527                 LOG.warn("Invalid declaration request: {}", req);
528                 // TODO: should return error reply?
529                 return;
530             }
531
532             /*
533              * OpFlex EP undeclaration/unregistration is different from REST EP
534              * declaration/registration -- REST only allows a single EP to be
535              * unregistered at a time. Since each MO represents an Endpoint
536              * that's being undeclared, we need add each one up separately,yet
537              * provide a single response. We also want to know the result of the
538              * unregistration so we can provide the appropriate response. We
539              * create a context for the Endpoint Registry interaction, where we
540              * can track the status of all the EP unregistrations, and provide a
541              * response when all have completed.
542              */
543             EprContext ctx = create(agent, req, dataProvider, executor);
544             ctx.setCallback(this);
545             ctx.deleteEp();
546         } else if (request instanceof EndpointResolveRequest) {
547             EndpointResolveRequest req = (EndpointResolveRequest) request;
548
549             if (!req.valid()) {
550                 LOG.warn("Invalid endpoint request: {}", req);
551                 // TODO: should return error reply?
552                 return;
553             }
554             List<EndpointResolveRequest.Params> paramList = req.getParams();
555
556             for (EndpointResolveRequest.Params param : paramList) {
557                 EprContext ctx = create(agent, req, dataProvider, executor);
558
559                 /*
560                  * We query the EPR for the EP. This is an asynchronous
561                  * operation, so we send the response in the callback
562                  */
563                 ctx.setCallback(this);
564                 ctx.lookupEndpoint();
565
566                 /*
567                  * A request is effectively a subscription. Add this agent to
568                  * the set of listeners.
569                  */
570                 Identity id;
571                 if (param.getEndpoint_ident() != null) {
572                     id = new Identity(param.getEndpoint_ident().getIdentifier());
573                 } else if (param.getEndpoint_uri() != null) {
574                     PolicyUri puri = new PolicyUri(param.getEndpoint_uri().getValue());
575                     id = new Identity(puri.pop());
576                 } else {
577                     // TOOD: should return error reply
578                     return;
579                 }
580                 addEpSubscription(agent, id.identityAsString());
581             }
582         } else if (request instanceof EndpointUnresolveRequest) {
583             EndpointUnresolveRequest req = (EndpointUnresolveRequest) request;
584
585             if (!req.valid()) {
586                 LOG.warn("Invalid endpoint request: {}", req);
587                 // TODO: should return error reply?
588                 return;
589             }
590
591             List<EndpointUnresolveRequest.Params> params = ((EndpointUnresolveRequest) request).getParams();
592
593             for (EndpointUnresolveRequest.Params param : params) {
594                 /*
595                  * No interaction with the Data Store is required -- just cancel
596                  * the notification subscription for this EP..
597                  */
598                 Identity id = null;
599                 if (param.getEndpoint_ident() != null) {
600                     id = new Identity(param.getEndpoint_ident().getIdentifier());
601                 } else if (param.getEndpoint_uri() != null) {
602                     PolicyUri puri = new PolicyUri(param.getEndpoint_uri().getValue());
603                     id = new Identity(puri.pop());
604                 } else {
605                     // TOODO: should return an error
606                     return;
607                 }
608                 removeEpSubscription(agent, id.identityAsString());
609             }
610
611             /*
612              * No EprContext is used in unresolve -- so just send the response
613              * directly
614              */
615             EndpointUnresolveResponse resp = new EndpointUnresolveResponse();
616             EndpointUnresolveResponse.Result result = new EndpointUnresolveResponse.Result();
617             resp.setResult(result);
618             resp.setId(req.getId());
619             try {
620                 agent.sendResponse(resp);
621             } catch (Exception e) {
622                 LOG.warn("Response {} could not be sent to {}", resp, agent);
623             }
624
625         } else {
626             LOG.warn("Unexpected callback, {}", request);
627         }
628
629     }
630
631     private class EndpointResponse implements Runnable {
632
633         private final EprContext ctx;
634         private final RpcMessage resp;
635
636         public EndpointResponse(EprContext ctx, RpcMessage resp) {
637             this.ctx = ctx;
638             this.resp = resp;
639         }
640
641         @Override
642         public void run() {
643             try {
644                 ctx.getPeer().sendResponse(resp);
645             } catch (Exception e) {
646                 // TODO: what to do here
647             }
648
649         }
650
651     }
652
653     /**
654      * This notification handles the callback from an interaction with the
655      * Endpoint Registry. The context for the callback is a notification from
656      * the data store, so so the code has to ensure that it won't block.
657      * Responses are sent using an executor
658      */
659     @Override
660     public void callback(EprContext ctx) {
661         RpcMessage resp = null;
662         if (ctx.getRequest() == null)
663             return;
664
665         if (!(ctx.getRequest() instanceof EndpointDeclareRequest)
666                 && !(ctx.getRequest() instanceof EndpointUndeclareRequest)
667                 && !(ctx.getRequest() instanceof EndpointResolveRequest)) {
668             return;
669         }
670
671         if (ctx.getRequest() instanceof EndpointDeclareRequest) {
672             EndpointDeclareRequest req = (EndpointDeclareRequest) ctx.getRequest();
673             EndpointDeclareResponse response = new EndpointDeclareResponse();
674             EndpointDeclareResponse.Result result = new EndpointDeclareResponse.Result();
675             response.setResult(result);
676             response.setId(req.getId());
677             response.setError(null); // TODO: real errors
678             resp = response;
679         } else if (ctx.getRequest() instanceof EndpointUndeclareRequest) {
680             EndpointUndeclareRequest req = (EndpointUndeclareRequest) ctx.getRequest();
681             EndpointUndeclareResponse response = new EndpointUndeclareResponse();
682             EndpointUndeclareResponse.Result result = new EndpointUndeclareResponse.Result();
683             response.setResult(result);
684             response.setId(req.getId());
685             response.setError(null); // TODO: real errors
686             resp = response;
687         } else {
688             EndpointResolveRequest req = (EndpointResolveRequest) ctx.getRequest();
689             EndpointResolveResponse response = new EndpointResolveResponse();
690             response.setId(req.getId());
691             EndpointResolveResponse.Result result = new EndpointResolveResponse.Result();
692             List<ManagedObject> epList = new ArrayList<ManagedObject>();
693
694             if (ctx.getOperations().size() > 0) {
695
696                 /*
697                  * If we get any EP, then we can provide a response to the
698                  * original request Note that we could potentially have multiple
699                  * requests outstanding for the same EP, and even using
700                  * different context types (L2 or L3).
701                  */
702                 for (EprOperation op : ctx.getOperations()) {
703
704                     ManagedObject mo = MessageUtils.getMoFromOp(op);
705                     if (mo != null) {
706                         epList.add(mo);
707                     }
708                     /*
709                      * For EPs on a different agent, we need to look up the VTEP
710                      * information. For now, we're only supporting VXLAN VTEPs,
711                      * so we look up the destination tunnel IP, and provide that
712                      * in the data field of the response
713                      */
714                     // TODO: Need to look this up in op store
715                     // endpoint.setData();
716                 }
717                 result.setEndpoint(epList);
718                 response.setResult(result);
719                 resp = response;
720             }
721         }
722         if (resp != null) {
723             executor.execute(new EndpointResponse(ctx, resp));
724         }
725     }
726 }