Feature uses features-parent as parent
[groupbasedpolicy.git] / renderers / opflex / src / main / java / org / opendaylight / groupbasedpolicy / renderer / opflex / lib / OpflexConnectionService.java
1 /*
2  * Copyright (C) 2013 Red Hat, Inc. Copyright (C) 2014 Cisco Systems, Inc.
3  * 
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  * 
8  * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller, Thomas Bachman
9  */
10 package org.opendaylight.groupbasedpolicy.renderer.opflex.lib;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CopyOnWriteArrayList;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
20
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
23 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
27 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.ConnectionService;
28 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.JsonRpcEndpoint;
29 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcBroker;
30 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessage;
31 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessageMap;
32 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.IdentityRequest;
33 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.IdentityResponse;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.OpflexError;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitionsBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistryBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.ObserverBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepositoryBuilder;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.binding.DataObject;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import com.google.common.base.Optional;
50 import com.google.common.util.concurrent.FutureCallback;
51 import com.google.common.util.concurrent.Futures;
52 import com.google.common.util.concurrent.ListenableFuture;
53
54 /**
55  * Manages the different OpFlex entity connections. It does this on behalf of
56  * each logical OpFlex entity: o Policy Repositories o Endpoint Registries o
57  * Observers
58  * Each OpFlex entity defines the JSON RPC methods supported, and manages their
59  * connection/discovery using dedicated servers. Servers and connections are
60  * maintained in dedicated client and server maps.
61  *
62  * @author tbachman
63  */
64 public class OpflexConnectionService implements ConnectionService, RpcBroker, RpcBroker.RpcCallback,
65         DataChangeListener, AutoCloseable {
66
67     protected static final Logger logger = LoggerFactory.getLogger(OpflexConnectionService.class);
68
69     public static final String OPFLEX_DOMAIN = "default";
70     static final String INVALID_DOMAIN = "Domain mismatch";
71     // Properties that can be set in config.ini
72     public static final String OPFLEX_LISTENPORT = "opflex.listenPort";
73     private static final Integer defaultOpflexPort = 6670;
74     public static final String OPFLEX_LISTENIP = "opflex.listenIp";
75     private static final String defaultOpflexIp = "0.0.0.0";
76
77     private Integer opflexListenPort = defaultOpflexPort;
78     private String opflexListenIp = defaultOpflexIp;
79
80     private final ScheduledExecutorService executor;
81     private final ListenerRegistration<DataChangeListener> dataListener;
82
83     String domain;
84     private final Map<String, OpflexAgent> opflexAgents = new ConcurrentHashMap<>();
85     private final Map<String, OpflexRpcServer> opflexServers = new ConcurrentHashMap<>();
86     private ConcurrentMap<String, List<RpcCallback>> brokerMap = null;
87
88     private DiscoveryDefinitions currentIdentities;
89     private final DataBroker dataProvider;
90     private final RpcMessageMap messageMap = new RpcMessageMap();
91
92     public static final InstanceIdentifier<DiscoveryDefinitions> DISCOVERY_IID = InstanceIdentifier.builder(
93             DiscoveryDefinitions.class).build();
94
95     public OpflexConnectionService(DataBroker salDataProvider, ScheduledExecutorService executor) {
96         this.dataProvider = salDataProvider;
97         this.executor = executor;
98
99         createBroker();
100
101         /* Subscribe to Discovery messages */
102         List<RpcMessage> messages = Role.DISCOVERY.getMessages();
103         this.messageMap.addList(messages);
104         for (RpcMessage msg : messages) {
105             this.subscribe(msg, this);
106         }
107
108         /*
109          * Check configuration to see which listeners we should be creating
110          */
111         int listenPort = defaultOpflexPort;
112         String portString = System.getProperty(OPFLEX_LISTENPORT);
113         if (portString != null) {
114             listenPort = Integer.decode(portString).intValue();
115         }
116         this.opflexListenPort = listenPort;
117         String listenIp = defaultOpflexIp;
118         String ipString = System.getProperty(OPFLEX_LISTENIP);
119         if (ipString != null) {
120             listenIp = ipString;
121         }
122         this.opflexListenIp = listenIp;
123
124         initializeServers();
125
126         this.dataListener = dataProvider.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
127                 OpflexConnectionService.DISCOVERY_IID, this, DataChangeScope.SUBTREE);
128     }
129
130     private List<OpflexRpcServer> setDefaultIdentities() {
131
132         /*
133          * Create a single server, filling all roles
134          */
135         String identity = opflexListenIp + ":" + opflexListenPort.toString();
136         List<OpflexRpcServer> srvList = new ArrayList<OpflexRpcServer>();
137         List<Role> roles = new ArrayList<Role>();
138         roles.add(Role.POLICY_REPOSITORY);
139         roles.add(Role.ENDPOINT_REGISTRY);
140         roles.add(Role.OBSERVER);
141
142         OpflexRpcServer srv = new OpflexRpcServer(domain, identity, roles);
143         srv.setConnectionService(this);
144         srv.setRpcBroker(this);
145         srvList.add(srv);
146         return srvList;
147
148     }
149
150     private List<OpflexRpcServer> createServerList(DiscoveryDefinitions identities) {
151
152         if (identities != null) {
153             Map<String, OpflexRpcServer> servers = new ConcurrentHashMap<String, OpflexRpcServer>();
154             List<String> addList = getPolicyRepositories(identities.getPolicyRepository());
155             addServerList(servers, addList, Role.POLICY_REPOSITORY);
156             addList = getEndpointRegistries(identities.getEndpointRegistry());
157             addServerList(servers, addList, Role.ENDPOINT_REGISTRY);
158             addList = getObservers(identities.getObserver());
159             addServerList(servers, addList, Role.OBSERVER);
160             return (new ArrayList<OpflexRpcServer>(servers.values()));
161         }
162         return null;
163     }
164
165     private void initializeServers() {
166         /*
167          * Get the configured identities, if any. If lists are empty, set up a
168          * single instance of each, using the default interface, all inside a
169          * default domain
170          */
171         domain = OPFLEX_DOMAIN;
172         readConfig();
173
174     }
175
176     private List<String> getPolicyRepositories(List<PolicyRepository> repositories) {
177         List<String> identityList = new ArrayList<String>();
178         if (repositories == null)
179             return null;
180         for (PolicyRepository pr : repositories) {
181             String identity = pr.getId() + ":" + pr.getPort().toString();
182             identityList.add(identity);
183         }
184         return identityList;
185     }
186
187     private List<String> getEndpointRegistries(List<EndpointRegistry> registries) {
188         List<String> identityList = new ArrayList<String>();
189         if (registries == null)
190             return null;
191         for (EndpointRegistry epr : registries) {
192             String identity = epr.getId() + ":" + epr.getPort().toString();
193             identityList.add(identity);
194         }
195         return identityList;
196     }
197
198     private List<String> getObservers(List<Observer> observers) {
199         List<String> identityList = new ArrayList<String>();
200         if (observers == null)
201             return null;
202         for (Observer o : observers) {
203             String identity = o.getId() + ":" + o.getPort().toString();
204             identityList.add(identity);
205         }
206         return identityList;
207     }
208
209     private void addServerList(Map<String, OpflexRpcServer> servers, List<String> idList, Role role) {
210         if (idList == null || idList.size() <= 0)
211             return;
212
213         for (String id : idList) {
214             List<Role> roles = new ArrayList<Role>();
215             OpflexRpcServer srv = servers.get(id);
216             if (srv != null) {
217                 roles = srv.getRoles();
218                 servers.remove(id);
219             }
220
221             roles.add(role);
222             srv = new OpflexRpcServer(domain, id, roles);
223             srv.setConnectionService(this);
224             srv.setRpcBroker(this);
225             servers.put(id, srv);
226         }
227
228     }
229
230     /**
231      * Find the {@link OpflexAgent} that owns this {@link JsonRpcEndpoint}.
232      *
233      * @param endpoint
234      *        The endpoint to look up
235      * @return The OpflexConnection that owns this endpoint
236      *         TODO: should throw an exception of there is no OpflexConnection
237      *         that contains this endpoint
238      */
239     public OpflexAgent getOpflexConnection(JsonRpcEndpoint endpoint) {
240
241         return getOpflexAgent(endpoint.getIdentifier());
242     }
243
244     /**
245      * Get the OpflexRpcServer that spawned this endpoint.
246      *
247      * @param endpoint
248      *        The endpoint to look up
249      * @return The OpflexRpcServer that owns this endpoint, or null if the
250      *         server no longer exists
251      *         TODO: exception if the endpoint is owned by anything
252      */
253     public OpflexRpcServer getOpflexServer(JsonRpcEndpoint endpoint) {
254         if (endpoint.getContext() instanceof OpflexRpcServer) {
255             return (OpflexRpcServer) endpoint.getContext();
256         }
257         logger.warn("Couldn't find OpflexConnection for endpoint {}", endpoint.getIdentifier());
258         return null;
259     }
260
261     /**
262      * Start the {@link OpflexConnectionService}
263      */
264     public synchronized void createBroker() {
265         brokerMap = new ConcurrentHashMap<String, List<RpcCallback>>();
266     }
267
268     public Map<String, OpflexAgent> getOpflexAgents() {
269         return opflexAgents;
270     }
271
272     public Map<String, OpflexRpcServer> getOpflexServers() {
273         return opflexServers;
274     }
275
276     public void removeOpflexAgent(OpflexAgent agent) {
277         opflexAgents.remove(agent.getIdentity());
278     }
279
280     public void removeOpflexServer(OpflexRpcServer server) {
281         opflexServers.remove(server.getId());
282     }
283
284     public List<OpflexRpcServer> getOpflexServerList() {
285         return new ArrayList<OpflexRpcServer>(opflexServers.values());
286     }
287
288     /**
289      * Clean up all the entities contained by this domain. The connection
290      * service also owns these references, so we provide notifications to the
291      * connection service so that it can clean up as well.
292      */
293     public void cleanup() {
294         List<String> agents = new ArrayList<String>(opflexAgents.keySet());
295         List<String> servers = new ArrayList<String>(opflexServers.keySet());
296         for (String agent : agents) {
297             OpflexAgent conn = opflexAgents.remove(agent);
298             conn.getEndpoint().getChannel().disconnect();
299         }
300         for (String srv : servers) {
301             OpflexRpcServer server = opflexServers.get(srv);
302             if (server.getRpcServer().getChannel() != null) {
303                 server.getRpcServer().getChannel().disconnect();
304             }
305         }
306     }
307
308     /**
309      * Add an {@link OpflexAgent} to the domain
310      *
311      * @param agent
312      *        The agent to add
313      */
314     public void addOpflexAgent(OpflexAgent agent) {
315         opflexAgents.put(agent.getIdentity(), agent);
316     }
317
318     /**
319      * Return the {@link OpflexAgent} associated with this identity
320      *
321      * @param identity
322      *        A string representing the connections identity
323      * @return The connection represented by that key, or null if not found
324      */
325     public OpflexAgent getOpflexAgent(String identity) {
326         return opflexAgents.get(identity);
327     }
328
329     /**
330      * Add the List of servers to the domain
331      *
332      * @param serverList
333      *        List of new servers to start
334      */
335     public void addServers(List<OpflexRpcServer> serverList) {
336
337         if (serverList == null)
338             return;
339
340         /*
341          * Check to see if there's already a server with this identity, and if
342          * so, close it and replace it with this one.
343          */
344         for (OpflexRpcServer srv : serverList) {
345             OpflexRpcServer server = opflexServers.get(srv.getId());
346             if (server != null) {
347                 if (!server.sameServer(srv)) {
348                     OpflexRpcServer oldServer = opflexServers.remove(srv.getId());
349                     if (oldServer != null && oldServer.getRpcServer() != null
350                             && oldServer.getRpcServer().getChannel() != null) {
351                         oldServer.getRpcServer().getChannel().disconnect();
352                     }
353                     opflexServers.put(srv.getId(), srv);
354                     srv.start();
355                 }
356             } else {
357                 opflexServers.put(srv.getId(), srv);
358                 srv.start();
359             }
360         }
361     }
362
363     /**
364      * Drop the list of servers from the domain
365      *
366      * @param oldServers
367      *        The list of servers to drop
368      *        TODO: Should we provide notifications to or close the
369      *        connections that were spawned by the deleted servers?
370      */
371     public void dropServers(List<String> oldServers) {
372         OpflexRpcServer server;
373
374         /*
375          * Check to see if there's a server with this identity, and if so, close
376          * it
377          */
378         for (String srv : oldServers) {
379             if (opflexServers.containsKey(srv)) {
380                 server = opflexServers.remove(srv);
381                 server.getRpcServer().getChannel().disconnect();
382             }
383         }
384     }
385
386     /**
387      * Check the new configuration of the servers against the existing, and if
388      * different, delete the old server and replace it with a new server running
389      * the updated parameters.
390      *
391      * @param serverList
392      *        The new server configurations
393      */
394     public void updateServers(List<OpflexRpcServer> serverList) {
395         /* Get the new list of configured servers in this domain */
396         List<OpflexRpcServer> updateServers = new ArrayList<OpflexRpcServer>();
397         List<OpflexRpcServer> newServers = new ArrayList<OpflexRpcServer>();
398         List<String> newList = new ArrayList<String>();
399
400         for (OpflexRpcServer srv : serverList) {
401             newList.add(srv.getId());
402         }
403
404         /* Get the list of currently configured servers in this domain */
405         List<String> currentList = new ArrayList<String>(opflexServers.keySet());
406
407         /* Make the add/drop/update lists */
408         List<String> addList = new ArrayList<String>(newList);
409         List<String> dropList = new ArrayList<String>(currentList);
410         List<String> updateList = new ArrayList<String>(newList);
411
412         addList.removeAll(currentList);
413         dropList.removeAll(newList);
414         updateList.removeAll(addList);
415
416         /*
417          * Create add and update lists
418          */
419         for (OpflexRpcServer srv : serverList) {
420             if (updateList.contains(srv.getId())) {
421                 updateServers.add(srv);
422             }
423             if (addList.contains(srv.getId())) {
424                 newServers.add(srv);
425             }
426         }
427
428         dropServers(dropList);
429         addServers(newServers);
430         addServers(updateServers);
431     }
432
433     private void readConfig() {
434         ListenableFuture<Optional<DiscoveryDefinitions>> dao = dataProvider.newReadOnlyTransaction().read(
435                 LogicalDatastoreType.CONFIGURATION, DISCOVERY_IID);
436         Futures.addCallback(dao, new FutureCallback<Optional<DiscoveryDefinitions>>() {
437
438             @Override
439             public void onSuccess(final Optional<DiscoveryDefinitions> result) {
440                 getNewConfig(result);
441             }
442
443             @Override
444             public void onFailure(Throwable t) {
445                 logger.error("Failed to read configuration", t);
446             }
447         }, executor);
448     }
449
450     void getNewConfig(final Optional<DiscoveryDefinitions> result) {
451         /*
452          * Get the new list of discovery definitions from the configuration
453          * store, and convert to a list for manipulation
454          */
455         if (!result.isPresent()) {
456             domain = OPFLEX_DOMAIN;
457             if (currentIdentities != null) {
458                 dropServers(new ArrayList<String>(opflexServers.keySet()));
459             }
460             List<OpflexRpcServer> defaults = setDefaultIdentities();
461             addServers(defaults);
462             commitDefaultConfiguration(defaults);
463
464             return;
465         }
466
467         currentIdentities = result.get();
468         if (currentIdentities == null) {
469             dropServers(new ArrayList<String>(opflexServers.keySet()));
470             List<OpflexRpcServer> defaults = setDefaultIdentities();
471             addServers(defaults);
472             commitDefaultConfiguration(defaults);
473         } else {
474             domain = currentIdentities.getDomain();
475             // TODO: what to do about existing agents? keep the same domain?
476             if (domain == null)
477                 domain = OPFLEX_DOMAIN;
478
479             updateServers(createServerList(currentIdentities));
480         }
481     }
482
483     private void commitDefaultConfiguration(List<OpflexRpcServer> servers) {
484         EndpointRegistryBuilder erb = new EndpointRegistryBuilder();
485         PolicyRepositoryBuilder prb = new PolicyRepositoryBuilder();
486         ObserverBuilder ob = new ObserverBuilder();
487         DiscoveryDefinitionsBuilder ddb = new DiscoveryDefinitionsBuilder();
488         for (OpflexRpcServer srv : servers) {
489             if (srv.getRoles().contains(Role.ENDPOINT_REGISTRY)) {
490                 erb.setId(srv.getAddress());
491                 erb.setPort(srv.getPort());
492             }
493             if (srv.getRoles().contains(Role.POLICY_REPOSITORY)) {
494                 prb.setId(srv.getAddress());
495                 prb.setPort(srv.getPort());
496             }
497             if (srv.getRoles().contains(Role.OBSERVER)) {
498                 ob.setId(srv.getAddress());
499                 ob.setPort(srv.getPort());
500             }
501
502         }
503         List<EndpointRegistry> erl = new ArrayList<EndpointRegistry>();
504         List<PolicyRepository> prl = new ArrayList<PolicyRepository>();
505         List<Observer> ol = new ArrayList<Observer>();
506         erl.add(erb.build());
507         prl.add(prb.build());
508         ol.add(ob.build());
509
510         ddb.setEndpointRegistry(erl);
511         ddb.setObserver(ol);
512         ddb.setPolicyRepository(prl);
513         ddb.setDomain(domain);
514         DiscoveryDefinitions identities = ddb.build();
515         WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
516         wt.put(LogicalDatastoreType.CONFIGURATION, DISCOVERY_IID, identities);
517         wt.submit();
518     }
519
520     // ******************
521     // DataChangeListener
522     // ******************
523
524     @Override
525     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
526
527         readConfig();
528     }
529
530     /**
531      * Close the connection service. Implemented from the AutoCloseable
532      * interface.
533      */
534     @Override
535     public void close() throws ExecutionException, InterruptedException {
536         cleanup();
537         dataListener.close();
538     }
539
540     /**
541      * Subscribe to a given {@link RpcMessage}. We synchronize this method to
542      * ensure consistency, and because subscriptions aren't a frequent event.
543      * The list is a copy on write list to ensure that anyone using a list has a
544      * usable copy
545      */
546     @Override
547     public synchronized void subscribe(RpcMessage message, RpcCallback callback) {
548
549         /*
550          * Create a new list, replacing the old
551          */
552         List<RpcCallback> cbList = brokerMap.get(message.getName());
553         if (cbList == null) {
554             cbList = new CopyOnWriteArrayList<RpcCallback>();
555             cbList.add(callback);
556             brokerMap.put(message.getName(), cbList);
557         } else if (!cbList.contains(callback)) {
558             cbList.add(callback);
559             brokerMap.replace(message.getName(), cbList);
560         }
561     }
562
563     /**
564      * Publish the {@link RpcMessage} to all subscribers.
565      */
566     @Override
567     public synchronized void publish(JsonRpcEndpoint endpoint, RpcMessage message) {
568         List<RpcCallback> cbList = brokerMap.get(message.getName());
569         if (cbList == null) {
570             logger.warn("Unhandled Message name is " + message.getName());
571             return;
572         }
573
574         for (RpcCallback cb : cbList) {
575             cb.callback(endpoint, message);
576         }
577     }
578
579     /**
580      * This notification handles the OpFlex Identity request messages.
581      */
582     @Override
583     public void callback(JsonRpcEndpoint endpoint, RpcMessage message) {
584
585         if (!(message instanceof IdentityRequest)) {
586             logger.warn("message is not identity request {}", message);
587             return;
588         }
589         OpflexRpcServer srv = getOpflexServer(endpoint);
590         if (srv == null)
591             return;
592
593         IdentityRequest request = (IdentityRequest) message;
594         IdentityResponse.Result result = new IdentityResponse.Result();
595
596         List<IdentityResponse.Peer> peers = new ArrayList<IdentityResponse.Peer>();
597
598         IdentityResponse response = new IdentityResponse();
599
600         /*
601          * We inherit our role from the server that spawned the connection.
602          */
603         List<String> myRoles = new ArrayList<String>();
604         List<Role> roles = srv.getRoles();
605         if (roles != null) {
606             for (Role r : roles) {
607                 myRoles.add(r.toString());
608             }
609         }
610         result.setMy_role(myRoles);
611
612         /*
613          * The peers field contains the identifiers other than my_role
614          */
615         if (request.getParams() == null || request.getParams().size() <= 0) {
616             return;
617         }
618         if (request.getParams() == null || request.getParams().get(0) == null
619                 || !request.getParams().get(0).getDomain().equals(domain)) {
620             OpflexError error = new OpflexError();
621             error.setMessage(INVALID_DOMAIN);
622             response.setError(error);
623             /* send domain mismatch */
624         } else {
625             for (OpflexRpcServer server : getOpflexServerList()) {
626                 /* Skip our server -- reported in my_role */
627                 // if ( Objects.equals(server.getId(), srv.getId()))
628                 // continue;
629                 roles = server.getRoles();
630                 if (roles != null) {
631                     IdentityResponse.Peer peer = new IdentityResponse.Peer();
632                     peer.setConnectivity_info(server.getId());
633                     List<String> stringRoles = new ArrayList<String>();
634                     for (Role r : roles) {
635                         stringRoles.add(r.toString());
636                     }
637                     peer.setRole(stringRoles);
638                     peers.add(peer);
639                 }
640             }
641             result.setPeers(peers);
642             result.setName(srv.getId());
643             result.setDomain(domain);
644             response.setResult(result);
645         }
646         response.setId(message.getId());
647
648         /*
649          * Collect the set of severs and send in the response
650          */
651         try {
652             endpoint.sendResponse(response);
653         } catch (Exception e) {
654             logger.error("Exception for sending {}, {}", message, e);
655         }
656     }
657
658     /**
659      * This is the notification when a new endpoint has been created. Since the
660      * endpoint is new, we don't have a OpflexConnection for it yet. We create
661      * the OpflexConnection, then retrieve the OpflexRpcServer that created this
662      * connections to inherit some of the fields we need (domain, server).
663      */
664     @Override
665     public void addConnection(JsonRpcEndpoint endpoint) {
666
667         /*
668          * When the connection is added, we only have the JsonRpcEndpoint. We
669          * use the JsonRpcEndpoint's context field to store the server object
670          * that created this connection, and can look up things like the domain,
671          * etc. to create the containing connection object.
672          */
673         if (!(endpoint.getContext() instanceof OpflexRpcServer)) {
674             logger.error("Connection for endpoint {} invalid", endpoint.getIdentifier());
675             // TODO: close connection?
676             return;
677         }
678
679         OpflexRpcServer server = (OpflexRpcServer) endpoint.getContext();
680
681         /*
682          * The OpFlex domain is the same as the server that the agent connected
683          * to. Look up the OpFlex RPC server using the server socket.
684          * 
685          * It's possible that the server was closed or changed between the
686          * connection establishment and now (race condition). Treat that as a
687          * failure, closing the connection.
688          */
689         OpflexAgent oc = new OpflexAgent();
690         oc.setEndpoint(endpoint);
691         oc.setIdentity(endpoint.getIdentifier());
692         oc.setDomain(domain);
693         oc.setOpflexServer(server);
694         oc.setRoles(server.getRoles());
695
696         logger.trace("Adding agent {}", endpoint.getIdentifier());
697         addOpflexAgent(oc);
698
699         /*
700          * Send an Identity Request
701          */
702         IdentityRequest ourId = new IdentityRequest();
703         IdentityRequest.Params params = new IdentityRequest.Params();
704         List<IdentityRequest.Params> paramList = new ArrayList<IdentityRequest.Params>();
705         List<String> myRoles = new ArrayList<String>();
706         List<Role> roles = server.getRoles();
707         if (roles != null) {
708             for (Role r : roles) {
709                 myRoles.add(r.toString());
710             }
711         }
712         params.setMy_role(myRoles);
713         params.setDomain(server.getDomain());
714         params.setName(server.getId());
715         paramList.add(params);
716         ourId.setParams(paramList);
717
718         try {
719             endpoint.sendRequest(ourId);
720         } catch (Exception e) {
721             logger.error("Couldn't send Identity {}", e);
722         }
723     }
724
725     /**
726      * This is the notification we receive when a connection is closed. Retrieve
727      * the domain from the {@link JsonRpcEndpoint}'s context field to get the
728      * {@link OpflexRpcServer}, which contains the OpFlex domain for this
729      * connection, then use the identity from the {@link JsonRpcEndpoint} and
730      * domain to remove the {@link OpflexAgent} from the domain
731      */
732     @Override
733     public void channelClosed(JsonRpcEndpoint endpoint) throws Exception {
734         logger.trace("Connection to Node : {} closed", endpoint.getIdentifier());
735         OpflexAgent agent = getOpflexConnection(endpoint);
736         if (agent != null) {
737             removeOpflexAgent(agent);
738         }
739     }
740
741 }