Added range type to subject-feature-definition/parameter
[groupbasedpolicy.git] / renderers / opflex / src / main / java / org / opendaylight / groupbasedpolicy / renderer / opflex / EndpointManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.opflex;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Map.Entry;
16 import java.util.Queue;
17 import java.util.Set;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.ScheduledExecutorService;
21
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
28 import org.opendaylight.groupbasedpolicy.endpoint.AbstractEndpointRegistry;
29 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
30 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
31 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
32 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
33 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexAgent;
35 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexConnectionService;
36 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.Role;
37 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointDeclareRequest;
38 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointDeclareResponse;
39 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointIdentity;
40 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointResolveResponse;
41 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUndeclareRequest;
42 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUndeclareResponse;
43 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUnresolveRequest;
44 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUnresolveResponse;
45 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointUpdateRequest;
46 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.EndpointResolveRequest;
47 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.ManagedObject;
48 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.MitLib;
49 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.PolicyUri;
50 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContext;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextL3;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.OpflexOverlayContextL3Builder;
62 import org.opendaylight.yangtools.concepts.ListenerRegistration;
63 import org.opendaylight.yangtools.yang.binding.DataObject;
64 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 /**
69  * Keep track of endpoints on the system.  Maintain an index of endpoints
70  * and their locations for queries from agents.  The endpoint manager will maintain
71  * appropriate indexes only for agents that are attached to the current
72  * controller node.
73  *
74  * In order to render the policy, we need to be able to efficiently enumerate
75  * all endpoints on a particular agent and also all the agents containing
76  * each particular endpoint group
77  * @author tbachman
78  */
79 public class EndpointManager
80         extends AbstractEndpointRegistry
81         implements AutoCloseable, DataChangeListener, RpcBroker.RpcCallback,
82         EprContext.EprCtxCallback {
83     protected static final Logger LOG =
84             LoggerFactory.getLogger(EndpointManager.class);
85
86     private static final InstanceIdentifier<Endpoint> endpointsIid =
87             InstanceIdentifier.builder(Endpoints.class)
88                 .child(Endpoint.class).build();
89     private static final InstanceIdentifier<EndpointL3> endpointsL3Iid =
90             InstanceIdentifier.builder(Endpoints.class)
91                 .child(EndpointL3.class).build();
92
93     final ListenerRegistration<DataChangeListener> listenerReg;
94     final ListenerRegistration<DataChangeListener> listenerL3Reg;
95
96     private final OpflexConnectionService connectionService;
97     private final MitLib mitLibrary;
98
99     final ConcurrentHashMap<EpKey, Endpoint> endpoints;
100
101     private ConcurrentHashMap<String, Set<String>> epSubscriptions;
102     private RpcMessageMap messageMap = null;
103
104     public EndpointManager(DataBroker dataProvider,
105                            RpcProviderRegistry rpcRegistry,
106                            ScheduledExecutorService executor,
107                            OpflexConnectionService connectionService,
108                            MitLib opflexLibrary) {
109         super(dataProvider, rpcRegistry, executor);
110
111         if (dataProvider != null) {
112             listenerReg = dataProvider
113                     .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
114                                                 endpointsIid,
115                                                 this,
116                                                 DataChangeScope.ONE);
117             listenerL3Reg = dataProvider
118                     .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
119                                                 endpointsL3Iid,
120                                                 this,
121                                                 DataChangeScope.ONE);
122         } else {
123             listenerReg = null;
124             listenerL3Reg = null;
125         }
126
127         this.connectionService = connectionService;
128         this.mitLibrary = opflexLibrary;
129
130         endpoints = new ConcurrentHashMap<EpKey, Endpoint>();
131         epSubscriptions = new ConcurrentHashMap<String, Set<String>>();
132
133         /* Subscribe to EPR messages */
134         messageMap = new RpcMessageMap();
135         List<RpcMessage> messages = Role.ENDPOINT_REGISTRY.getMessages();
136         messageMap.addList(messages);
137         for (RpcMessage msg: messages) {
138             this.connectionService.subscribe(msg, this);
139         }
140         LOG.trace("Initialized OpFlex endpoint manager");
141     }
142
143     /**
144      * Shut down the {@link EndpointManager}
145      *
146      */
147     public void shutdown() {
148
149     }
150
151     // ***************
152     // EndpointManager
153     // ***************
154
155     /**
156      * Get the endpoint object for the given key
157      * @param epKey the key
158      * @return the {@link Endpoint} corresponding to the key
159      */
160     public Endpoint getEndpoint(EpKey epKey) {
161         return endpoints.get(epKey);
162     }
163
164     // ************************
165     // AbstractEndpointRegistry
166     // ************************
167
168     @Override
169     protected EndpointBuilder buildEndpoint(RegisterEndpointInput input) {
170         OpflexOverlayContextInput ictx =
171                 input.getAugmentation(OpflexOverlayContextInput.class);
172         return super.buildEndpoint(input)
173                 .addAugmentation(OpflexOverlayContext.class,
174                                  new OpflexOverlayContextBuilder(ictx).build());
175     }
176
177     @Override
178     protected EndpointL3Builder buildEndpointL3(RegisterEndpointInput input) {
179         OpflexOverlayContextInput ictx =
180                 input.getAugmentation(OpflexOverlayContextInput.class);
181         return super.buildEndpointL3(input)
182                 .addAugmentation(OpflexOverlayContextL3.class,
183                                  new OpflexOverlayContextL3Builder(ictx).build());
184     }
185
186     // *************
187     // AutoCloseable
188     // *************
189
190     @Override
191     public void close() throws Exception {
192         if (listenerReg != null) listenerReg.close();
193         super.close();
194     }
195
196     // ******************
197     // DataChangeListener
198     // ******************
199
200     @Override
201     public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
202         for (DataObject dao : change.getCreatedData().values()) {
203             if (dao instanceof Endpoint)
204                 updateEndpoint(null, dao);
205         }
206         for (InstanceIdentifier<?> iid : change.getRemovedPaths()) {
207             DataObject old = change.getOriginalData().get(iid);
208             if (old != null && old instanceof Endpoint)
209                 updateEndpoint(old, null);
210         }
211         Map<InstanceIdentifier<?>,DataObject> d = change.getUpdatedData();
212         for (Entry<InstanceIdentifier<?>, DataObject> entry : d.entrySet()) {
213             if ((!(entry.getValue() instanceof Endpoint)) &&
214                 (!(entry.getValue() instanceof EndpointL3))) continue;
215             DataObject old = change.getOriginalData().get(entry.getKey());
216             DataObject oldEp = null;
217             if (entry instanceof Endpoint ||
218                 entry instanceof EndpointL3) {
219                 if (old != null && old instanceof Endpoint)
220                     oldEp = old;
221                 updateEndpoint(oldEp, entry.getValue());
222             }
223         }
224     }
225     // **************
226     // Implementation
227     // **************
228
229     private Identity getIdentity(DataObject obj) {
230         Identity id = null;
231         if (obj instanceof Endpoint) {
232             Endpoint ep = (Endpoint)obj;
233             id = new Identity(ep);
234             id.setContext(ep.getL2Context().getValue());
235         }
236
237         if (obj instanceof EndpointL3) {
238             EndpointL3 ep = (EndpointL3)obj;
239             id = new Identity(ep);
240             id.setContext(ep.getL3Context().getValue());
241         }
242         if (id != null && !id.valid()) {
243             return null;
244         }
245         return id;
246     }
247
248     private synchronized Set<String> getEpSubscriptions(String id) {
249         return epSubscriptions.get(id);
250     }
251
252     /**
253      *  Provide endpoint policy update messages based on changes
254      */
255     protected void updateEndpoint(DataObject oldEp, DataObject newEp) {
256         Identity oldId = getIdentity(oldEp);
257         Identity newId = getIdentity(newEp);
258         /*
259          * If an endpoint has changed, we need to provide notifications
260          * to agents that have subscribed to that endpoint. Batch up
261          * the notifications to be sent to the agents.
262          */
263         Queue<EndpointUpdate> updateQ = new ConcurrentLinkedQueue<EndpointUpdate>();
264
265         /* This covers additions or updates */
266         if (newId != null) {
267             Set<String> agentList = getEpSubscriptions(newId.identityAsString());
268             if (agentList != null) {
269                 for (String agentId : agentList) {
270                     OpflexAgent agent = connectionService.getOpflexAgent(agentId);
271                     if (agent != null) {
272                         updateQ.add(new EndpointUpdate(EndpointUpdate.UpdateType.ADD_CHANGE,
273                                     agent.getEndpoint(),
274                                     newEp));
275                     }
276                 }
277             }
278         }
279         /* this covers deletions */
280         if ((newId == null) && (oldId != null)) {
281             Set<String> agentList = getEpSubscriptions(oldId.identityAsString());
282             if (agentList != null) {
283                 for (String agentId : agentList) {
284                     OpflexAgent agent = connectionService.getOpflexAgent(agentId);
285                     if (agent != null) {
286                         updateQ.add(new EndpointUpdate(EndpointUpdate.UpdateType.DELETE,
287                                     agent.getEndpoint(),
288                                     oldEp));
289                     }
290                 }
291             }
292         }
293
294         sendEpUpdates(updateQ);
295     }
296
297     private static class EndpointUpdate implements Runnable {
298         public static enum UpdateType {
299                 ADD_CHANGE("add_change"),
300                 DELETE("delete");
301
302                 private String updateType;
303
304                 UpdateType(String updateType) {
305                         this.updateType = updateType;
306                 }
307
308                 @Override
309                 public String toString() {
310                         return this.updateType;
311                 }
312         }
313
314         private final UpdateType type;
315         private final JsonRpcEndpoint agent;
316         private final ManagedObject mo;
317         EndpointUpdate(UpdateType type, JsonRpcEndpoint agent, DataObject obj) {
318                 this.type = type;
319             this.agent = agent;
320             mo = MessageUtils.getMoFromEp(obj);
321         }
322
323         @Override
324         public void run() {
325             EndpointUpdateRequest request =
326                     new EndpointUpdateRequest();
327             EndpointUpdateRequest.Params params =
328                     new EndpointUpdateRequest.Params();
329             List<EndpointUpdateRequest.Params> paramList =
330                     new ArrayList<EndpointUpdateRequest.Params>();
331
332             // TODO: how do we get delete URIs from the
333             // normalized policy?
334             List<Uri> delete_uri = new ArrayList<Uri>();
335             List<ManagedObject> replace = new ArrayList<ManagedObject>();
336             if (mo != null) {
337                 replace.add(mo);
338                 delete_uri.add(mo.getUri());
339             }
340             if (type == EndpointUpdate.UpdateType.ADD_CHANGE) {
341                 params.setReplace(replace);
342             }
343             else if (type == EndpointUpdate.UpdateType.DELETE) {
344                 params.setDelete_uri(delete_uri);
345             }
346
347             paramList.add(params);
348             request.setParams(paramList);
349             try {
350                 agent.sendRequest(request);
351             }
352             catch (Throwable t) {
353
354             }
355
356         }
357
358     }
359
360     private void sendEpUpdates(Queue<EndpointUpdate> updateQ) {
361         while (!updateQ.isEmpty()) {
362             executor.execute(updateQ.remove());
363         }
364     }
365
366         /**
367      * Create an Endpoint Registry Context for an OpFlex
368      * Request message.
369      *
370          * @param agent
371          * @param request
372          * @param dataProvider
373          * @param executor
374          * @return
375          */
376     public EprContext create(JsonRpcEndpoint agent,
377                              RpcMessage message,
378                              DataBroker dataProvider,
379                              ScheduledExecutorService executor) {
380
381         EprContext ec = null;
382
383         if (message instanceof EndpointDeclareRequest) {
384                 EndpointDeclareRequest request = (EndpointDeclareRequest)message;
385                 /*
386                  * There theoretically could be a list of parameters,
387                  * but we'll likely only ever see one element.
388                  */
389                 ec = new EprContext(agent, request, dataProvider, executor);
390                 for (EndpointDeclareRequest.Params params : request.getParams()) {
391
392                         int prr = params.getPrr();
393
394                     /*
395                          * We have a list of endpoints, so iterate through the
396                          * list and register each one, extracting the identities
397                          * for registration.
398                          */
399                         List<ManagedObject> endpoints = params.getEndpoint();
400                         if (endpoints != null) {
401                                 for (ManagedObject mo: endpoints) {
402                                         EprOperation eo =
403                                                 MessageUtils.getEprOpFromEpMo(mo, prr, agent.getIdentifier());
404                                         ec.addOperation(eo);
405                                 }
406                         }
407                 }
408         }
409         else if (message instanceof EndpointUndeclareRequest) {
410                 EndpointUndeclareRequest request = (EndpointUndeclareRequest)message;
411                 ec = new EprContext(agent, request, dataProvider, executor);
412                 for (EndpointUndeclareRequest.Params params : request.getParams()) {
413
414                         /*
415                          * A single URI is provided per param in Undeclare messages
416                          */
417                         String subject = params.getSubject();
418                 Uri uri = params.getEndpoint_uri();
419                 if (uri != null) {
420                         EprOperation op = MessageUtils.getEprOpFromUri(uri, subject);
421                         ec.addOperation(op);
422                 }
423                 }
424         }
425         else if (message instanceof EndpointResolveRequest) {
426                 EndpointResolveRequest request = (EndpointResolveRequest)message;
427                 ec = new EprContext(agent, request, dataProvider, executor);
428                 for (EndpointResolveRequest.Params params: request.getParams()) {
429                 /*
430                  * The resolve message contains either the URI
431                  * or a context/URI and an identifier. There is only
432                  * one of these per param array entry.
433                  */
434                 EndpointIdentity eid = params.getEndpoint_ident();
435
436                 String subject = params.getSubject();
437
438                 if (eid != null) {
439
440                         EprOperation op = MessageUtils.getEprOpFromEpId(eid, subject);
441                         ec.addOperation(op);
442
443                 } else {
444                     /*
445                      * Extract the list to add the EP to from
446                      * the URI
447                      */
448                     Uri uri = params.getEndpoint_uri();
449                         if (uri != null) {
450                                 EprOperation op = MessageUtils.getEprOpFromUri(uri, subject);
451                                 ec.addOperation(op);
452                         }
453                 }
454                 }
455         }
456         return ec;
457     }
458
459     private synchronized void addEpSubscription(JsonRpcEndpoint agent, String id) {
460         Set<String> agents = epSubscriptions.get(id);
461         if (agents == null) {
462             agents = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
463             Set<String> result = epSubscriptions.putIfAbsent(id, agents);
464             if (result != null) {
465                 agents = result;
466             }
467         }
468         agents.add(agent.getIdentifier());
469     }
470
471     private synchronized void removeEpSubscription(JsonRpcEndpoint agent, String id) {
472         Set<String> agents = epSubscriptions.get(id);
473         if (agents != null) {
474                 agents.remove(id);
475         }
476     }
477
478     /**
479      * This notification handles the OpFlex Endpoint messages.
480      * We should only receive request messages. Responses are
481      * sent in a different context, as all requests result
482      * in a Future to access the data store.
483      *
484      * @param agent The JsonRpcEndpoint that received the request
485      * @param request The request message from the endpoint
486      */
487     @Override
488     public void callback(JsonRpcEndpoint agent, RpcMessage request) {
489
490         if (messageMap.get(request.getMethod()) == null) {
491             LOG.warn("message {} was not subscribed to, but was delivered.", request);
492             return;
493         }
494
495         /*
496          * For declaration requests, we need to make sure that this
497          * EP is in our registry. Since we can have multiple identifiers,
498          * we create a Set of endpoints.
499          */
500
501         if (request instanceof EndpointDeclareRequest) {
502             EndpointDeclareRequest req = (EndpointDeclareRequest)request;
503
504
505             /*
506              * valid() ensures presence of params and MOs, so we know those
507              * won't be null
508              */
509             if (!req.valid()) {
510                 LOG.warn("Invalid declaration request: {}", req);
511                 // TODO: should return error reply?
512                 return;
513             }
514
515             /*
516              * OpFlex EP declaration/registration is different from
517              * REST EP declaration/registration -- REST only allows
518              * a single EP to be registered at a time. Since each MO
519              * represents an Endpoint that's being declared, we need
520              * add each one up separately,yet provide a single response.
521              * We also want to know the result of the registration so
522              * we can provide the appropriate response. We create a
523              * context for the Endpoint Registry interaction, where
524              * we can track the status of all the EP registrations,
525              * and provide a response when all have completed.
526              */
527             EprContext ctx = create(agent, req, dataProvider, executor);
528             ctx.setCallback(this);
529             ctx.createEp();
530         }
531         else if (request instanceof EndpointUndeclareRequest) {
532             EndpointUndeclareRequest req = (EndpointUndeclareRequest)request;
533
534             /*
535              * valid() ensures presence of params and URIs, so we know those
536              * won't be null
537              */
538             if (!req.valid()) {
539                 LOG.warn("Invalid declaration request: {}", req);
540                 // TODO: should return error reply?
541                 return;
542             }
543
544             /*
545              * OpFlex EP undeclaration/unregistration is different from
546              * REST EP declaration/registration -- REST only allows
547              * a single EP to be unregistered at a time. Since each MO
548              * represents an Endpoint that's being undeclared, we need
549              * add each one up separately,yet provide a single response.
550              * We also want to know the result of the unregistration so
551              * we can provide the appropriate response. We create a
552              * context for the Endpoint Registry interaction, where
553              * we can track the status of all the EP unregistrations,
554              * and provide a response when all have completed.
555              */
556             EprContext ctx = create(agent, req, dataProvider, executor);
557             ctx.setCallback(this);
558             ctx.deleteEp();
559         }
560         else if (request instanceof EndpointResolveRequest) {
561             EndpointResolveRequest req = (EndpointResolveRequest)request;
562
563             if (!req.valid()) {
564                 LOG.warn("Invalid endpoint request: {}", req);
565                 // TODO: should return error reply?
566                 return;
567             }
568             List<EndpointResolveRequest.Params> paramList =
569                         req.getParams();
570
571             for (EndpointResolveRequest.Params param: paramList) {
572                 EprContext ctx = create(agent, req, dataProvider, executor);
573
574                 /*
575                  * We query the EPR for the EP. This is an asynchronous
576                  * operation, so we send the response in the callback
577                  */
578                 ctx.setCallback(this);
579                 ctx.lookupEndpoint();
580
581                 /*
582                  * A request is effectively a subscription. Add this agent
583                  * to the set of listeners.
584                  */
585                 Identity id = null;
586                 if (param.getEndpoint_ident() != null) {
587                     id = new Identity(param.getEndpoint_ident().getIdentifier());
588                 }
589                 else if (param.getEndpoint_uri() != null) {
590                         PolicyUri puri = new PolicyUri(param.getEndpoint_uri().getValue());
591                     id = new Identity(puri.pop());
592                 }
593                 else {
594                         // TOOD: should return error reply
595                         return;
596                 }
597                 addEpSubscription(agent, id.identityAsString());
598             }
599         }
600         else if (request instanceof EndpointUnresolveRequest) {
601                 EndpointUnresolveRequest req = (EndpointUnresolveRequest)request;
602
603             if (!req.valid()) {
604                 LOG.warn("Invalid endpoint request: {}", req);
605                 // TODO: should return error reply?
606                 return;
607             }
608
609                 List<EndpointUnresolveRequest.Params> params =
610                                 ((EndpointUnresolveRequest) request).getParams();
611
612                 for (EndpointUnresolveRequest.Params param: params) {
613                 /*
614                  * No interaction with the Data Store is required -- just
615                  * cancel the notification subscription for this EP..
616                  */
617                 Identity id = null;
618                 if (param.getEndpoint_ident() != null) {
619                     id = new Identity(param.getEndpoint_ident().getIdentifier());
620                 }
621                 else if (param.getEndpoint_uri() != null) {
622                         PolicyUri puri = new PolicyUri(param.getEndpoint_uri().getValue());
623                     id = new Identity(puri.pop());
624                 }
625                 else {
626                         // TOODO: should return an error
627                         return;
628                 }
629                 removeEpSubscription(agent, id.identityAsString());
630                 }
631
632             /*
633              * No EprContext is used in unresolve -- so
634              * just send the response directly
635              */
636             EndpointUnresolveResponse resp =
637                     new EndpointUnresolveResponse();
638             EndpointUnresolveResponse.Result result =
639                     new EndpointUnresolveResponse.Result();
640             resp.setResult(result);
641             resp.setId(req.getId());
642             try {
643                 agent.sendResponse(resp);
644             }
645             catch (Throwable t) {
646                 LOG.warn("Response {} could not be sent to {}", resp, agent);
647             }
648
649         }
650         else {
651             LOG.warn("Unexpected callback, {}", request);
652         }
653
654     }
655
656     private class EndpointResponse implements Runnable {
657
658         private EprContext ctx;
659         private RpcMessage resp;
660         public EndpointResponse(EprContext ctx, RpcMessage resp) {
661             this.ctx = ctx;
662             this.resp = resp;
663         }
664
665         @Override
666         public void run() {
667             try {
668                 ctx.getPeer().sendResponse(resp);
669             } catch (Throwable t) {
670                 // TODO: what to do here
671             }
672
673         }
674
675     }
676
677     /**
678      * This notification handles the callback from an interaction
679      * with the Endpoint Registry. The context for the callback
680      * is a notification from the data store, so so the code
681      * has to ensure that it won't block. Responses are sent
682      * using an executor
683      */
684     @Override
685     public void callback(EprContext ctx) {
686         RpcMessage resp = null;
687         if (ctx.getRequest() == null) return;
688
689         if (!(ctx.getRequest() instanceof EndpointDeclareRequest) &&
690             !(ctx.getRequest() instanceof EndpointUndeclareRequest) &&
691             !(ctx.getRequest() instanceof EndpointResolveRequest)) {
692             return;
693         }
694
695         if (ctx.getRequest() instanceof EndpointDeclareRequest) {
696                 EndpointDeclareRequest req =
697                                 (EndpointDeclareRequest)ctx.getRequest();
698                 EndpointDeclareResponse response = new EndpointDeclareResponse();
699                 EndpointDeclareResponse.Result result =
700                                 new EndpointDeclareResponse.Result();
701             response.setResult(result);
702             response.setId(req.getId());
703             response.setError(null); // TODO: real errors
704             resp = response;
705         }
706         else if (ctx.getRequest() instanceof EndpointUndeclareRequest) {
707                 EndpointUndeclareRequest req =
708                                 (EndpointUndeclareRequest)ctx.getRequest();
709                 EndpointUndeclareResponse response = new EndpointUndeclareResponse();
710                 EndpointUndeclareResponse.Result result =
711                                 new EndpointUndeclareResponse.Result();
712             response.setResult(result);
713             response.setId(req.getId());
714             response.setError(null); // TODO: real errors
715             resp = response;
716         }
717         else {
718                 EndpointResolveRequest req =
719                         (EndpointResolveRequest)ctx.getRequest();
720                 EndpointResolveResponse response = new EndpointResolveResponse();
721                 response.setId(req.getId());
722                 EndpointResolveResponse.Result result =
723                         new EndpointResolveResponse.Result();
724                 List<ManagedObject> epList =
725                         new ArrayList<ManagedObject>();
726
727
728                 if (ctx.getOperations().size() > 0) {
729
730                     /*
731                      * If we get any EP, then we can
732                      * provide a response to the original request
733                      * Note that we could potentially have multiple
734                      * requests outstanding for the same EP, and
735                      * even using different context types (L2 or L3).
736                      */
737                         for (EprOperation op: ctx.getOperations()) {
738
739                         ManagedObject mo = MessageUtils.getMoFromOp(op);
740                         if (mo != null) {
741                             epList.add(mo);
742                         }
743                         /*
744                          * For EPs on a different agent, we need to look up the
745                          * VTEP information. For now, we're only supporting
746                          * VXLAN VTEPs, so we look up the destination tunnel IP,
747                          * and provide that in the data field of the response
748                          */
749                         // TODO: Need to look this up in op store
750                         //endpoint.setData();
751                     }
752                     result.setEndpoint(epList);
753                     response.setResult(result);
754                     resp = response;
755                 }
756         }
757         if (resp != null) {
758             executor.execute(new EndpointResponse(ctx, resp));
759         }
760     }
761 }