Separating renderers into features.
[groupbasedpolicy.git] / renderers / opflex / src / main / java / org / opendaylight / groupbasedpolicy / renderer / opflex / lib / OpflexConnectionService.java
1 /*
2  * Copyright (C) 2013 Red Hat, Inc. Copyright (C) 2014 Cisco Systems, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller, Thomas Bachman
9  */
10 package org.opendaylight.groupbasedpolicy.renderer.opflex.lib;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CopyOnWriteArrayList;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
20
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
23 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
27 import org.opendaylight.groupbasedpolicy.jsonrpc.ConnectionService;
28 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
29 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
30 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
31 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
32 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.IdentityRequest;
33 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.IdentityResponse;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.OpflexError;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitionsBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistryBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.ObserverBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepositoryBuilder;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.binding.DataObject;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import com.google.common.base.Optional;
50 import com.google.common.util.concurrent.FutureCallback;
51 import com.google.common.util.concurrent.Futures;
52 import com.google.common.util.concurrent.ListenableFuture;
53
54 /**
55  * Manages the different OpFlex entity connections. It does this on behalf of
56  * each logical OpFlex entity: o Policy Repositories o Endpoint Registries o
57  * Observers
58  *
59  * Each OpFlex entity defines the JSON RPC methods supported, and manages their
60  * connection/discovery using dedicated servers. Servers and connections are
61  * maintained in dedicated client and server maps.
62  *
63  * @author tbachman
64  *
65  */
66 public class OpflexConnectionService implements ConnectionService, RpcBroker,
67         RpcBroker.RpcCallback, DataChangeListener, AutoCloseable {
68     protected static final Logger logger = LoggerFactory
69             .getLogger(OpflexConnectionService.class);
70
71     public static final String OPFLEX_DOMAIN = "default";
72     static final String INVALID_DOMAIN = "Domain mismatch";
73     // Properties that can be set in config.ini
74     public static final String OPFLEX_LISTENPORT = "opflex.listenPort";
75     private static final Integer defaultOpflexPort = 6670;
76     public static final String OPFLEX_LISTENIP = "opflex.listenIp";
77     private static final String defaultOpflexIp = "0.0.0.0";
78
79     private Integer opflexListenPort = defaultOpflexPort;
80     private String opflexListenIp = defaultOpflexIp;
81
82     private final ScheduledExecutorService executor;
83     private final ListenerRegistration<DataChangeListener> dataListener;
84
85     String domain;
86     private ConcurrentMap<String, OpflexAgent> opflexAgents = null;
87     private ConcurrentMap<String, OpflexRpcServer> opflexServers = null;
88     private ConcurrentMap<String, List<RpcCallback>> brokerMap = null;
89
90     private DiscoveryDefinitions currentIdentities;
91     private final DataBroker dataProvider;
92     private RpcMessageMap messageMap = null;
93
94     public static final InstanceIdentifier<DiscoveryDefinitions> DISCOVERY_IID = InstanceIdentifier
95             .builder(DiscoveryDefinitions.class).build();
96
97     public OpflexConnectionService(DataBroker salDataProvider,
98             ScheduledExecutorService executor) {
99         this.dataProvider = salDataProvider;
100         this.executor = executor;
101
102         this.opflexAgents = new ConcurrentHashMap<String, OpflexAgent>();
103         this.opflexServers = new ConcurrentHashMap<String, OpflexRpcServer>();
104         createBroker();
105
106         /* Subscribe to Discovery messages */
107         this.messageMap = new RpcMessageMap();
108         List<RpcMessage> messages = Role.DISCOVERY.getMessages();
109         this.messageMap.addList(messages);
110         for (RpcMessage msg : messages) {
111             this.subscribe(msg, this);
112         }
113
114         /*
115          * Check configuration to see which listeners we should be creating
116          */
117         int listenPort = defaultOpflexPort;
118         String portString = System.getProperty(OPFLEX_LISTENPORT);
119         if (portString != null) {
120             listenPort = Integer.decode(portString).intValue();
121         }
122         this.opflexListenPort = listenPort;
123         String listenIp = defaultOpflexIp;
124         String ipString = System.getProperty(OPFLEX_LISTENIP);
125         if (ipString != null) {
126             listenIp = ipString;
127         }
128         this.opflexListenIp = listenIp;
129
130         initializeServers();
131
132         this.dataListener = dataProvider.registerDataChangeListener(
133                 LogicalDatastoreType.CONFIGURATION,
134                 OpflexConnectionService.DISCOVERY_IID, this,
135                 DataChangeScope.SUBTREE);
136     }
137
138     private List<OpflexRpcServer> setDefaultIdentities() {
139
140         /*
141          * Create a single server, filling all roles
142          */
143         String identity = opflexListenIp + ":" + opflexListenPort.toString();
144         List<OpflexRpcServer> srvList = new ArrayList<OpflexRpcServer>();
145         List<Role> roles = new ArrayList<Role>();
146         roles.add(Role.POLICY_REPOSITORY);
147         roles.add(Role.ENDPOINT_REGISTRY);
148         roles.add(Role.OBSERVER);
149
150         OpflexRpcServer srv = new OpflexRpcServer(domain, identity, roles);
151         srv.setConnectionService(this);
152         srv.setRpcBroker(this);
153         srvList.add(srv);
154         return srvList;
155
156     }
157
158     private List<OpflexRpcServer> createServerList(
159             DiscoveryDefinitions identities) {
160
161         if (identities != null) {
162             Map<String, OpflexRpcServer> servers = new ConcurrentHashMap<String, OpflexRpcServer>();
163             List<String> addList = getPolicyRepositories(identities
164                     .getPolicyRepository());
165             addServerList(servers, addList, Role.POLICY_REPOSITORY);
166             addList = getEndpointRegistries(identities.getEndpointRegistry());
167             addServerList(servers, addList, Role.ENDPOINT_REGISTRY);
168             addList = getObservers(identities.getObserver());
169             addServerList(servers, addList, Role.OBSERVER);
170             return (new ArrayList<OpflexRpcServer>(servers.values()));
171         }
172         return null;
173     }
174
175     private void initializeServers() {
176         /*
177          * Get the configured identities, if any. If lists are empty, set up a
178          * single instance of each, using the default interface, all inside a
179          * default domain
180          */
181         domain = OPFLEX_DOMAIN;
182         readConfig();
183
184     }
185
186     private List<String> getPolicyRepositories(
187             List<PolicyRepository> repositories) {
188         List<String> identityList = new ArrayList<String>();
189         if (repositories == null)
190             return null;
191         for (PolicyRepository pr : repositories) {
192             String identity = pr.getId() + ":" + pr.getPort().toString();
193             identityList.add(identity);
194         }
195         return identityList;
196     }
197
198     private List<String> getEndpointRegistries(List<EndpointRegistry> registries) {
199         List<String> identityList = new ArrayList<String>();
200         if (registries == null)
201             return null;
202         for (EndpointRegistry epr : registries) {
203             String identity = epr.getId() + ":" + epr.getPort().toString();
204             identityList.add(identity);
205         }
206         return identityList;
207     }
208
209     private List<String> getObservers(List<Observer> observers) {
210         List<String> identityList = new ArrayList<String>();
211         if (observers == null)
212             return null;
213         for (Observer o : observers) {
214             String identity = o.getId() + ":" + o.getPort().toString();
215             identityList.add(identity);
216         }
217         return identityList;
218     }
219
220     private void addServerList(Map<String, OpflexRpcServer> servers,
221             List<String> idList, Role role) {
222         if (idList == null || idList.size() <= 0)
223             return;
224
225         for (String id : idList) {
226             List<Role> roles = new ArrayList<Role>();
227             OpflexRpcServer srv = servers.get(id);
228             if (srv != null) {
229                 roles = srv.getRoles();
230                 servers.remove(id);
231             }
232
233             roles.add(role);
234             srv = new OpflexRpcServer(domain, id, roles);
235             srv.setConnectionService(this);
236             srv.setRpcBroker(this);
237             servers.put(id, srv);
238         }
239
240     }
241
242     /**
243      * Find the {@link OpflexAgent} that owns this {@link JsonRpcEndpoint}.
244      *
245      * @param endpoint
246      *            The endpoint to look up
247      * @return The OpflexConnection that owns this endpoint
248      *
249      *         TODO: should throw an exception of there is no OpflexConnection
250      *         that contains this endpoint
251      */
252     public OpflexAgent getOpflexConnection(JsonRpcEndpoint endpoint) {
253
254         return getOpflexAgent(endpoint.getIdentifier());
255     }
256
257     /**
258      * Get the OpflexRpcServer that spawned this endpoint.
259      *
260      * @param endpoint
261      *            The endpoint to look up
262      * @return The OpflexRpcServer that owns this endpoint, or null if the
263      *         server no longer exists
264      *
265      *         TODO: exception if the endpoint is owned by anything
266      */
267     public OpflexRpcServer getOpflexServer(JsonRpcEndpoint endpoint) {
268         if (endpoint.getContext() instanceof OpflexRpcServer) {
269             return (OpflexRpcServer) endpoint.getContext();
270         }
271         logger.warn("Couldn't find OpflexConnection for endpoint {}",
272                 endpoint.getIdentifier());
273         return null;
274     }
275
276     /**
277      * Start the {@link OpflexConnectionService}
278      */
279     public synchronized void createBroker() {
280         brokerMap = new ConcurrentHashMap<String, List<RpcCallback>>();
281     }
282
283     public ConcurrentMap<String, OpflexAgent> getOpflexAgents() {
284         return opflexAgents;
285     }
286
287     public void setOpflexAgents(ConcurrentMap<String, OpflexAgent> opflexAgents) {
288         this.opflexAgents = opflexAgents;
289     }
290
291     public ConcurrentMap<String, OpflexRpcServer> getOpflexServers() {
292         return opflexServers;
293     }
294
295     public void setOpflexServers(
296             ConcurrentMap<String, OpflexRpcServer> opflexServers) {
297         this.opflexServers = opflexServers;
298     }
299
300     public void removeOpflexAgent(OpflexAgent agent) {
301         opflexAgents.remove(agent.getIdentity());
302     }
303
304     public void removeOpflexServer(OpflexRpcServer server) {
305         opflexServers.remove(server.getId());
306     }
307
308     public List<OpflexRpcServer> getOpflexServerList() {
309         return new ArrayList<OpflexRpcServer>(opflexServers.values());
310     }
311
312     /**
313      * Clean up all the entities contained by this domain. The connection
314      * service also owns these references, so we provide notifications to the
315      * connection service so that it can clean up as well.
316      */
317     public void cleanup() {
318         List<String> agents = new ArrayList<String>(opflexAgents.keySet());
319         List<String> servers = new ArrayList<String>(opflexServers.keySet());
320         for (String agent : agents) {
321             OpflexAgent conn = opflexAgents.remove(agent);
322             conn.getEndpoint().getChannel().disconnect();
323         }
324         for (String srv : servers) {
325             OpflexRpcServer server = opflexServers.get(srv);
326             if (server.getRpcServer().getChannel() != null) {
327                 server.getRpcServer().getChannel().disconnect();
328             }
329         }
330     }
331
332     /**
333      * Add an {@link OpflexAgent} to the domain
334      *
335      * @param agent
336      *            The agent to add
337      */
338     public void addOpflexAgent(OpflexAgent agent) {
339         opflexAgents.put(agent.getIdentity(), agent);
340     }
341
342     /**
343      * Return the {@link OpflexAgent} associated with this identity
344      *
345      * @param identity
346      *            A string representing the connections identity
347      * @return The connection represented by that key, or null if not found
348      */
349     public OpflexAgent getOpflexAgent(String identity) {
350         return opflexAgents.get(identity);
351     }
352
353     /**
354      * Add the List of servers to the domain
355      *
356      * @param serverList
357      *            List of new servers to start
358      */
359     public void addServers(List<OpflexRpcServer> serverList) {
360
361         if (serverList == null)
362             return;
363
364         /*
365          * Check to see if there's already a server with this identity, and if
366          * so, close it and replace it with this one.
367          */
368         for (OpflexRpcServer srv : serverList) {
369             OpflexRpcServer server = opflexServers.get(srv.getId());
370             if (server != null) {
371                 if (!server.sameServer(srv)) {
372                     OpflexRpcServer oldServer = opflexServers.remove(srv
373                             .getId());
374                     if (oldServer != null && oldServer.getRpcServer() != null
375                             && oldServer.getRpcServer().getChannel() != null) {
376                         oldServer.getRpcServer().getChannel().disconnect();
377                     }
378                     opflexServers.put(srv.getId(), srv);
379                     srv.start();
380                 }
381             } else {
382                 opflexServers.put(srv.getId(), srv);
383                 srv.start();
384             }
385         }
386     }
387
388     /**
389      * Drop the list of servers from the domain
390      *
391      * @param oldServers
392      *            The list of servers to drop
393      *
394      *            TODO: Should we provide notifications to or close the
395      *            connections that were spawned by the deleted servers?
396      */
397     public void dropServers(List<String> oldServers) {
398         OpflexRpcServer server;
399
400         /*
401          * Check to see if there's a server with this identity, and if so, close
402          * it
403          */
404         for (String srv : oldServers) {
405             if (opflexServers.containsKey(srv)) {
406                 server = opflexServers.remove(srv);
407                 server.getRpcServer().getChannel().disconnect();
408             }
409         }
410     }
411
412     /**
413      * Check the new configuration of the servers against the existing, and if
414      * different, delete the old server and replace it with a new server running
415      * the updated parameters.
416      *
417      * @param serverList
418      *            The new server configurations
419      */
420     public void updateServers(List<OpflexRpcServer> serverList) {
421         /* Get the new list of configured servers in this domain */
422         List<OpflexRpcServer> updateServers = new ArrayList<OpflexRpcServer>();
423         List<OpflexRpcServer> newServers = new ArrayList<OpflexRpcServer>();
424         List<String> newList = new ArrayList<String>();
425
426         for (OpflexRpcServer srv : serverList) {
427             newList.add(srv.getId());
428         }
429
430         /* Get the list of currently configured servers in this domain */
431         List<String> currentList = new ArrayList<String>(opflexServers.keySet());
432
433         /* Make the add/drop/update lists */
434         List<String> addList = new ArrayList<String>(newList);
435         List<String> dropList = new ArrayList<String>(currentList);
436         List<String> updateList = new ArrayList<String>(newList);
437
438         addList.removeAll(currentList);
439         dropList.removeAll(newList);
440         updateList.removeAll(addList);
441
442         /*
443          * Create add and update lists
444          */
445         for (OpflexRpcServer srv : serverList) {
446             if (updateList.contains(srv.getId())) {
447                 updateServers.add(srv);
448             }
449             if (addList.contains(srv.getId())) {
450                 newServers.add(srv);
451             }
452         }
453
454         dropServers(dropList);
455         addServers(newServers);
456         addServers(updateServers);
457     }
458
459     private void readConfig() {
460         ListenableFuture<Optional<DiscoveryDefinitions>> dao = dataProvider
461                 .newReadOnlyTransaction().read(
462                         LogicalDatastoreType.CONFIGURATION, DISCOVERY_IID);
463         Futures.addCallback(dao,
464                 new FutureCallback<Optional<DiscoveryDefinitions>>() {
465                     @Override
466                     public void onSuccess(
467                             final Optional<DiscoveryDefinitions> result) {
468                         getNewConfig(result);
469                     }
470
471                     @Override
472                     public void onFailure(Throwable t) {
473                         logger.error("Failed to read configuration", t);
474                     }
475                 }, executor);
476     }
477
478     void getNewConfig(final Optional<DiscoveryDefinitions> result) {
479         /*
480          * Get the new list of discovery definitions from the configuration
481          * store, and convert to a list for manipulation
482          */
483         if (!result.isPresent()) {
484             domain = OPFLEX_DOMAIN;
485             if (currentIdentities != null) {
486                 dropServers(new ArrayList<String>(opflexServers.keySet()));
487             }
488             List<OpflexRpcServer> defaults = setDefaultIdentities();
489             addServers(defaults);
490             commitDefaultConfiguration(defaults);
491
492             return;
493         }
494
495         currentIdentities = result.get();
496         if (currentIdentities == null) {
497             dropServers(new ArrayList<String>(opflexServers.keySet()));
498             List<OpflexRpcServer> defaults = setDefaultIdentities();
499             addServers(defaults);
500             commitDefaultConfiguration(defaults);
501         } else {
502             domain = currentIdentities.getDomain();
503             // TODO: what to do about existing agents? keep the same domain?
504             if (domain == null)
505                 domain = OPFLEX_DOMAIN;
506
507             updateServers(createServerList(currentIdentities));
508         }
509     }
510
511     private void commitDefaultConfiguration(List<OpflexRpcServer> servers) {
512         EndpointRegistryBuilder erb = new EndpointRegistryBuilder();
513         PolicyRepositoryBuilder prb = new PolicyRepositoryBuilder();
514         ObserverBuilder ob = new ObserverBuilder();
515         DiscoveryDefinitionsBuilder ddb = new DiscoveryDefinitionsBuilder();
516         for (OpflexRpcServer srv : servers) {
517             if (srv.getRoles().contains(Role.ENDPOINT_REGISTRY)) {
518                 erb.setId(srv.getAddress());
519                 erb.setPort(srv.getPort());
520             }
521             if (srv.getRoles().contains(Role.POLICY_REPOSITORY)) {
522                 prb.setId(srv.getAddress());
523                 prb.setPort(srv.getPort());
524             }
525             if (srv.getRoles().contains(Role.OBSERVER)) {
526                 ob.setId(srv.getAddress());
527                 ob.setPort(srv.getPort());
528             }
529
530         }
531         List<EndpointRegistry> erl = new ArrayList<EndpointRegistry>();
532         List<PolicyRepository> prl = new ArrayList<PolicyRepository>();
533         List<Observer> ol = new ArrayList<Observer>();
534         erl.add(erb.build());
535         prl.add(prb.build());
536         ol.add(ob.build());
537
538         ddb.setEndpointRegistry(erl);
539         ddb.setObserver(ol);
540         ddb.setPolicyRepository(prl);
541         ddb.setDomain(domain);
542         DiscoveryDefinitions identities = ddb.build();
543         WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
544         wt.put(LogicalDatastoreType.CONFIGURATION, DISCOVERY_IID, identities);
545         wt.submit();
546     }
547
548     // ******************
549     // DataChangeListener
550     // ******************
551
552     @Override
553     public void onDataChanged(
554             final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
555
556         readConfig();
557     }
558
559     /**
560      * Close the connection service. Implemented from the AutoCloseable
561      * interface.
562      */
563     @Override
564     public void close() throws ExecutionException, InterruptedException {
565         cleanup();
566         dataListener.close();
567     }
568
569     /**
570      * Subscribe to a given {@link RpcMessage}. We synchronize this method to
571      * ensure consistency, and because subscriptions aren't a frequent event.
572      * The list is a copy on write list to ensure that anyone using a list has a
573      * usable copy
574      */
575     @Override
576     public synchronized void subscribe(RpcMessage message, RpcCallback callback) {
577
578         /*
579          * Create a new list, replacing the old
580          */
581         List<RpcCallback> cbList = brokerMap.get(message.getName());
582         if (cbList == null) {
583             cbList = new CopyOnWriteArrayList<RpcCallback>();
584             cbList.add(callback);
585             brokerMap.put(message.getName(), cbList);
586         } else if (!cbList.contains(callback)) {
587             cbList.add(callback);
588             brokerMap.replace(message.getName(), cbList);
589         }
590     }
591
592     /**
593      * Publish the {@link RpcMessage} to all subscribers.
594      *
595      */
596     @Override
597     public synchronized void publish(JsonRpcEndpoint endpoint,
598             RpcMessage message) {
599         List<RpcCallback> cbList = brokerMap.get(message.getName());
600         if (cbList == null) {
601             System.out
602                     .println("Unhandled Message name is " + message.getName());
603             return;
604         }
605
606         for (RpcCallback cb : cbList) {
607             cb.callback(endpoint, message);
608         }
609     }
610
611     /**
612      * This notification handles the OpFlex Identity request messages.
613      */
614     @Override
615     public void callback(JsonRpcEndpoint endpoint, RpcMessage message) {
616
617         if (!(message instanceof IdentityRequest)) {
618             logger.warn("message is not identity request {}", message);
619             return;
620         }
621         OpflexRpcServer srv = getOpflexServer(endpoint);
622         if (srv == null)
623             return;
624
625         IdentityRequest request = (IdentityRequest) message;
626         IdentityResponse.Result result = new IdentityResponse.Result();
627
628         List<IdentityResponse.Peer> peers = new ArrayList<IdentityResponse.Peer>();
629
630         IdentityResponse response = new IdentityResponse();
631
632         /*
633          * We inherit our role from the server that spawned the connection.
634          */
635         List<String> myRoles = new ArrayList<String>();
636         List<Role> roles = srv.getRoles();
637         if (roles != null) {
638             for (Role r : roles) {
639                 myRoles.add(r.toString());
640             }
641         }
642         result.setMy_role(myRoles);
643
644         /*
645          * The peers field contains the identifiers other than my_role
646          */
647         if (request.getParams() == null || request.getParams().size() <= 0) {
648             return;
649         }
650         if (request.getParams() == null || request.getParams().get(0) == null
651                 || !request.getParams().get(0).getDomain().equals(domain)) {
652             OpflexError error = new OpflexError();
653             error.setMessage(INVALID_DOMAIN);
654             response.setError(error);
655             /* send domain mismatch */
656         } else {
657             for (OpflexRpcServer server : getOpflexServerList()) {
658                 /* Skip our server -- reported in my_role */
659                 // if ( Objects.equals(server.getId(), srv.getId()))
660                 // continue;
661                 roles = server.getRoles();
662                 if (roles != null) {
663                     IdentityResponse.Peer peer = new IdentityResponse.Peer();
664                     peer.setConnectivity_info(server.getId());
665                     List<String> stringRoles = new ArrayList<String>();
666                     for (Role r : roles) {
667                         stringRoles.add(r.toString());
668                     }
669                     peer.setRole(stringRoles);
670                     peers.add(peer);
671                 }
672             }
673             result.setPeers(peers);
674             result.setName(srv.getId());
675             result.setDomain(domain);
676             response.setResult(result);
677         }
678         response.setId(message.getId());
679
680         /*
681          * Collect the set of severs and send in the response
682          */
683         try {
684             endpoint.sendResponse(response);
685         } catch (Throwable e) {
686             logger.error("Throwable for sending {}, {}", message, e);
687         }
688     }
689
690     /**
691      * This is the notification when a new endpoint has been created. Since the
692      * endpoint is new, we don't have a OpflexConnection for it yet. We create
693      * the OpflexConnection, then retrieve the OpflexRpcServer that created this
694      * connections to inherit some of the fields we need (domain, server).
695      */
696     @Override
697     public void addConnection(JsonRpcEndpoint endpoint) {
698
699         /*
700          * When the connection is added, we only have the JsonRpcEndpoint. We
701          * use the JsonRpcEndpoint's context field to store the server object
702          * that created this connection, and can look up things like the domain,
703          * etc. to create the containing connection object.
704          */
705         if (!(endpoint.getContext() instanceof OpflexRpcServer)) {
706             logger.error("Connection for endpoint {} invalid",
707                     endpoint.getIdentifier());
708             // TODO: close connection?
709             return;
710         }
711
712         OpflexRpcServer server = (OpflexRpcServer) endpoint.getContext();
713
714         /*
715          * The OpFlex domain is the same as the server that the agent connected
716          * to. Look up the OpFlex RPC server using the server socket.
717          *
718          * It's possible that the server was closed or changed between the
719          * connection establishment and now (race condition). Treat that as a
720          * failure, closing the connection.
721          */
722         OpflexAgent oc = new OpflexAgent();
723         oc.setEndpoint(endpoint);
724         oc.setIdentity(endpoint.getIdentifier());
725         oc.setDomain(domain);
726         oc.setOpflexServer(server);
727         oc.setRoles(server.getRoles());
728
729         logger.trace("Adding agent {}", endpoint.getIdentifier());
730         addOpflexAgent(oc);
731
732         /*
733          * Send an Identity Request
734          */
735         IdentityRequest ourId = new IdentityRequest();
736         IdentityRequest.Params params = new IdentityRequest.Params();
737         List<IdentityRequest.Params> paramList = new ArrayList<IdentityRequest.Params>();
738         List<String> myRoles = new ArrayList<String>();
739         List<Role> roles = server.getRoles();
740         if (roles != null) {
741             for (Role r : roles) {
742                 myRoles.add(r.toString());
743             }
744         }
745         params.setMy_role(myRoles);
746         params.setDomain(server.getDomain());
747         params.setName(server.getId());
748         paramList.add(params);
749         ourId.setParams(paramList);
750
751         try {
752             endpoint.sendRequest(ourId);
753         } catch (Throwable t) {
754             logger.error("Couldn't send Identity {}", t);
755         }
756     }
757
758     /**
759      * This is the notification we receive when a connection is closed. Retrieve
760      * the domain from the {@link JsonRpcEndpoint}'s context field to get the
761      * {@link OpflexRpcServer}, which contains the OpFlex domain for this
762      * connection, then use the identity from the {@link JsonRpcEndpoint} and
763      * domain to remove the {@link OpflexAgent} from the domain
764      */
765     @Override
766     public void channelClosed(JsonRpcEndpoint endpoint) throws Exception {
767         logger.trace("Connection to Node : {} closed", endpoint.getIdentifier());
768         OpflexAgent agent = getOpflexConnection(endpoint);
769         if (agent != null) {
770             removeOpflexAgent(agent);
771         }
772     }
773
774 }