2 * Copyright (C) 2013 Red Hat, Inc. Copyright (C) 2014 Cisco Systems, Inc.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller, Thomas Bachman
10 package org.opendaylight.groupbasedpolicy.renderer.opflex.lib;
12 import java.util.ArrayList;
13 import java.util.List;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CopyOnWriteArrayList;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
23 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
27 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.ConnectionService;
28 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.JsonRpcEndpoint;
29 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcBroker;
30 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessage;
31 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessageMap;
32 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.IdentityRequest;
33 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.IdentityResponse;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.OpflexError;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitionsBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistryBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.ObserverBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepositoryBuilder;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.binding.DataObject;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 import com.google.common.base.Optional;
50 import com.google.common.util.concurrent.FutureCallback;
51 import com.google.common.util.concurrent.Futures;
52 import com.google.common.util.concurrent.ListenableFuture;
55 * Manages the different OpFlex entity connections. It does this on behalf of
56 * each logical OpFlex entity: o Policy Repositories o Endpoint Registries o
58 * Each OpFlex entity defines the JSON RPC methods supported, and manages their
59 * connection/discovery using dedicated servers. Servers and connections are
60 * maintained in dedicated client and server maps.
64 public class OpflexConnectionService implements ConnectionService, RpcBroker, RpcBroker.RpcCallback,
65 DataChangeListener, AutoCloseable {
67 protected static final Logger logger = LoggerFactory.getLogger(OpflexConnectionService.class);
69 public static final String OPFLEX_DOMAIN = "default";
70 static final String INVALID_DOMAIN = "Domain mismatch";
71 // Properties that can be set in config.ini
72 public static final String OPFLEX_LISTENPORT = "opflex.listenPort";
73 private static final Integer defaultOpflexPort = 6670;
74 public static final String OPFLEX_LISTENIP = "opflex.listenIp";
75 private static final String defaultOpflexIp = "0.0.0.0";
77 private Integer opflexListenPort = defaultOpflexPort;
78 private String opflexListenIp = defaultOpflexIp;
80 private final ScheduledExecutorService executor;
81 private final ListenerRegistration<DataChangeListener> dataListener;
84 private final Map<String, OpflexAgent> opflexAgents = new ConcurrentHashMap<>();
85 private final Map<String, OpflexRpcServer> opflexServers = new ConcurrentHashMap<>();
86 private ConcurrentMap<String, List<RpcCallback>> brokerMap = null;
88 private DiscoveryDefinitions currentIdentities;
89 private final DataBroker dataProvider;
90 private final RpcMessageMap messageMap = new RpcMessageMap();
92 public static final InstanceIdentifier<DiscoveryDefinitions> DISCOVERY_IID = InstanceIdentifier.builder(
93 DiscoveryDefinitions.class).build();
95 public OpflexConnectionService(DataBroker salDataProvider, ScheduledExecutorService executor) {
96 this.dataProvider = salDataProvider;
97 this.executor = executor;
101 /* Subscribe to Discovery messages */
102 List<RpcMessage> messages = Role.DISCOVERY.getMessages();
103 this.messageMap.addList(messages);
104 for (RpcMessage msg : messages) {
105 this.subscribe(msg, this);
109 * Check configuration to see which listeners we should be creating
111 int listenPort = defaultOpflexPort;
112 String portString = System.getProperty(OPFLEX_LISTENPORT);
113 if (portString != null) {
114 listenPort = Integer.decode(portString).intValue();
116 this.opflexListenPort = listenPort;
117 String listenIp = defaultOpflexIp;
118 String ipString = System.getProperty(OPFLEX_LISTENIP);
119 if (ipString != null) {
122 this.opflexListenIp = listenIp;
126 this.dataListener = dataProvider.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
127 OpflexConnectionService.DISCOVERY_IID, this, DataChangeScope.SUBTREE);
130 private List<OpflexRpcServer> setDefaultIdentities() {
133 * Create a single server, filling all roles
135 String identity = opflexListenIp + ":" + opflexListenPort.toString();
136 List<OpflexRpcServer> srvList = new ArrayList<OpflexRpcServer>();
137 List<Role> roles = new ArrayList<Role>();
138 roles.add(Role.POLICY_REPOSITORY);
139 roles.add(Role.ENDPOINT_REGISTRY);
140 roles.add(Role.OBSERVER);
142 OpflexRpcServer srv = new OpflexRpcServer(domain, identity, roles);
143 srv.setConnectionService(this);
144 srv.setRpcBroker(this);
150 private List<OpflexRpcServer> createServerList(DiscoveryDefinitions identities) {
152 if (identities != null) {
153 Map<String, OpflexRpcServer> servers = new ConcurrentHashMap<String, OpflexRpcServer>();
154 List<String> addList = getPolicyRepositories(identities.getPolicyRepository());
155 addServerList(servers, addList, Role.POLICY_REPOSITORY);
156 addList = getEndpointRegistries(identities.getEndpointRegistry());
157 addServerList(servers, addList, Role.ENDPOINT_REGISTRY);
158 addList = getObservers(identities.getObserver());
159 addServerList(servers, addList, Role.OBSERVER);
160 return (new ArrayList<OpflexRpcServer>(servers.values()));
165 private void initializeServers() {
167 * Get the configured identities, if any. If lists are empty, set up a
168 * single instance of each, using the default interface, all inside a
171 domain = OPFLEX_DOMAIN;
176 private List<String> getPolicyRepositories(List<PolicyRepository> repositories) {
177 List<String> identityList = new ArrayList<String>();
178 if (repositories == null)
180 for (PolicyRepository pr : repositories) {
181 String identity = pr.getId() + ":" + pr.getPort().toString();
182 identityList.add(identity);
187 private List<String> getEndpointRegistries(List<EndpointRegistry> registries) {
188 List<String> identityList = new ArrayList<String>();
189 if (registries == null)
191 for (EndpointRegistry epr : registries) {
192 String identity = epr.getId() + ":" + epr.getPort().toString();
193 identityList.add(identity);
198 private List<String> getObservers(List<Observer> observers) {
199 List<String> identityList = new ArrayList<String>();
200 if (observers == null)
202 for (Observer o : observers) {
203 String identity = o.getId() + ":" + o.getPort().toString();
204 identityList.add(identity);
209 private void addServerList(Map<String, OpflexRpcServer> servers, List<String> idList, Role role) {
210 if (idList == null || idList.size() <= 0)
213 for (String id : idList) {
214 List<Role> roles = new ArrayList<Role>();
215 OpflexRpcServer srv = servers.get(id);
217 roles = srv.getRoles();
222 srv = new OpflexRpcServer(domain, id, roles);
223 srv.setConnectionService(this);
224 srv.setRpcBroker(this);
225 servers.put(id, srv);
231 * Find the {@link OpflexAgent} that owns this {@link JsonRpcEndpoint}.
234 * The endpoint to look up
235 * @return The OpflexConnection that owns this endpoint
236 * TODO: should throw an exception of there is no OpflexConnection
237 * that contains this endpoint
239 public OpflexAgent getOpflexConnection(JsonRpcEndpoint endpoint) {
241 return getOpflexAgent(endpoint.getIdentifier());
245 * Get the OpflexRpcServer that spawned this endpoint.
248 * The endpoint to look up
249 * @return The OpflexRpcServer that owns this endpoint, or null if the
250 * server no longer exists
251 * TODO: exception if the endpoint is owned by anything
253 public OpflexRpcServer getOpflexServer(JsonRpcEndpoint endpoint) {
254 if (endpoint.getContext() instanceof OpflexRpcServer) {
255 return (OpflexRpcServer) endpoint.getContext();
257 logger.warn("Couldn't find OpflexConnection for endpoint {}", endpoint.getIdentifier());
262 * Start the {@link OpflexConnectionService}
264 public synchronized void createBroker() {
265 brokerMap = new ConcurrentHashMap<String, List<RpcCallback>>();
268 public Map<String, OpflexAgent> getOpflexAgents() {
272 public Map<String, OpflexRpcServer> getOpflexServers() {
273 return opflexServers;
276 public void removeOpflexAgent(OpflexAgent agent) {
277 opflexAgents.remove(agent.getIdentity());
280 public void removeOpflexServer(OpflexRpcServer server) {
281 opflexServers.remove(server.getId());
284 public List<OpflexRpcServer> getOpflexServerList() {
285 return new ArrayList<OpflexRpcServer>(opflexServers.values());
289 * Clean up all the entities contained by this domain. The connection
290 * service also owns these references, so we provide notifications to the
291 * connection service so that it can clean up as well.
293 public void cleanup() {
294 List<String> agents = new ArrayList<String>(opflexAgents.keySet());
295 List<String> servers = new ArrayList<String>(opflexServers.keySet());
296 for (String agent : agents) {
297 OpflexAgent conn = opflexAgents.remove(agent);
298 conn.getEndpoint().getChannel().disconnect();
300 for (String srv : servers) {
301 OpflexRpcServer server = opflexServers.get(srv);
302 if (server.getRpcServer().getChannel() != null) {
303 server.getRpcServer().getChannel().disconnect();
309 * Add an {@link OpflexAgent} to the domain
314 public void addOpflexAgent(OpflexAgent agent) {
315 opflexAgents.put(agent.getIdentity(), agent);
319 * Return the {@link OpflexAgent} associated with this identity
322 * A string representing the connections identity
323 * @return The connection represented by that key, or null if not found
325 public OpflexAgent getOpflexAgent(String identity) {
326 return opflexAgents.get(identity);
330 * Add the List of servers to the domain
333 * List of new servers to start
335 public void addServers(List<OpflexRpcServer> serverList) {
337 if (serverList == null)
341 * Check to see if there's already a server with this identity, and if
342 * so, close it and replace it with this one.
344 for (OpflexRpcServer srv : serverList) {
345 OpflexRpcServer server = opflexServers.get(srv.getId());
346 if (server != null) {
347 if (!server.sameServer(srv)) {
348 OpflexRpcServer oldServer = opflexServers.remove(srv.getId());
349 if (oldServer != null && oldServer.getRpcServer() != null
350 && oldServer.getRpcServer().getChannel() != null) {
351 oldServer.getRpcServer().getChannel().disconnect();
353 opflexServers.put(srv.getId(), srv);
357 opflexServers.put(srv.getId(), srv);
364 * Drop the list of servers from the domain
367 * The list of servers to drop
368 * TODO: Should we provide notifications to or close the
369 * connections that were spawned by the deleted servers?
371 public void dropServers(List<String> oldServers) {
372 OpflexRpcServer server;
375 * Check to see if there's a server with this identity, and if so, close
378 for (String srv : oldServers) {
379 if (opflexServers.containsKey(srv)) {
380 server = opflexServers.remove(srv);
381 server.getRpcServer().getChannel().disconnect();
387 * Check the new configuration of the servers against the existing, and if
388 * different, delete the old server and replace it with a new server running
389 * the updated parameters.
392 * The new server configurations
394 public void updateServers(List<OpflexRpcServer> serverList) {
395 /* Get the new list of configured servers in this domain */
396 List<OpflexRpcServer> updateServers = new ArrayList<OpflexRpcServer>();
397 List<OpflexRpcServer> newServers = new ArrayList<OpflexRpcServer>();
398 List<String> newList = new ArrayList<String>();
400 for (OpflexRpcServer srv : serverList) {
401 newList.add(srv.getId());
404 /* Get the list of currently configured servers in this domain */
405 List<String> currentList = new ArrayList<String>(opflexServers.keySet());
407 /* Make the add/drop/update lists */
408 List<String> addList = new ArrayList<String>(newList);
409 List<String> dropList = new ArrayList<String>(currentList);
410 List<String> updateList = new ArrayList<String>(newList);
412 addList.removeAll(currentList);
413 dropList.removeAll(newList);
414 updateList.removeAll(addList);
417 * Create add and update lists
419 for (OpflexRpcServer srv : serverList) {
420 if (updateList.contains(srv.getId())) {
421 updateServers.add(srv);
423 if (addList.contains(srv.getId())) {
428 dropServers(dropList);
429 addServers(newServers);
430 addServers(updateServers);
433 private void readConfig() {
434 ListenableFuture<Optional<DiscoveryDefinitions>> dao = dataProvider.newReadOnlyTransaction().read(
435 LogicalDatastoreType.CONFIGURATION, DISCOVERY_IID);
436 Futures.addCallback(dao, new FutureCallback<Optional<DiscoveryDefinitions>>() {
439 public void onSuccess(final Optional<DiscoveryDefinitions> result) {
440 getNewConfig(result);
444 public void onFailure(Throwable t) {
445 logger.error("Failed to read configuration", t);
450 void getNewConfig(final Optional<DiscoveryDefinitions> result) {
452 * Get the new list of discovery definitions from the configuration
453 * store, and convert to a list for manipulation
455 if (!result.isPresent()) {
456 domain = OPFLEX_DOMAIN;
457 if (currentIdentities != null) {
458 dropServers(new ArrayList<String>(opflexServers.keySet()));
460 List<OpflexRpcServer> defaults = setDefaultIdentities();
461 addServers(defaults);
462 commitDefaultConfiguration(defaults);
467 currentIdentities = result.get();
468 if (currentIdentities == null) {
469 dropServers(new ArrayList<String>(opflexServers.keySet()));
470 List<OpflexRpcServer> defaults = setDefaultIdentities();
471 addServers(defaults);
472 commitDefaultConfiguration(defaults);
474 domain = currentIdentities.getDomain();
475 // TODO: what to do about existing agents? keep the same domain?
477 domain = OPFLEX_DOMAIN;
479 updateServers(createServerList(currentIdentities));
483 private void commitDefaultConfiguration(List<OpflexRpcServer> servers) {
484 EndpointRegistryBuilder erb = new EndpointRegistryBuilder();
485 PolicyRepositoryBuilder prb = new PolicyRepositoryBuilder();
486 ObserverBuilder ob = new ObserverBuilder();
487 DiscoveryDefinitionsBuilder ddb = new DiscoveryDefinitionsBuilder();
488 for (OpflexRpcServer srv : servers) {
489 if (srv.getRoles().contains(Role.ENDPOINT_REGISTRY)) {
490 erb.setId(srv.getAddress());
491 erb.setPort(srv.getPort());
493 if (srv.getRoles().contains(Role.POLICY_REPOSITORY)) {
494 prb.setId(srv.getAddress());
495 prb.setPort(srv.getPort());
497 if (srv.getRoles().contains(Role.OBSERVER)) {
498 ob.setId(srv.getAddress());
499 ob.setPort(srv.getPort());
503 List<EndpointRegistry> erl = new ArrayList<EndpointRegistry>();
504 List<PolicyRepository> prl = new ArrayList<PolicyRepository>();
505 List<Observer> ol = new ArrayList<Observer>();
506 erl.add(erb.build());
507 prl.add(prb.build());
510 ddb.setEndpointRegistry(erl);
512 ddb.setPolicyRepository(prl);
513 ddb.setDomain(domain);
514 DiscoveryDefinitions identities = ddb.build();
515 WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
516 wt.put(LogicalDatastoreType.CONFIGURATION, DISCOVERY_IID, identities);
520 // ******************
521 // DataChangeListener
522 // ******************
525 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
531 * Close the connection service. Implemented from the AutoCloseable
535 public void close() throws ExecutionException, InterruptedException {
537 dataListener.close();
541 * Subscribe to a given {@link RpcMessage}. We synchronize this method to
542 * ensure consistency, and because subscriptions aren't a frequent event.
543 * The list is a copy on write list to ensure that anyone using a list has a
547 public synchronized void subscribe(RpcMessage message, RpcCallback callback) {
550 * Create a new list, replacing the old
552 List<RpcCallback> cbList = brokerMap.get(message.getName());
553 if (cbList == null) {
554 cbList = new CopyOnWriteArrayList<RpcCallback>();
555 cbList.add(callback);
556 brokerMap.put(message.getName(), cbList);
557 } else if (!cbList.contains(callback)) {
558 cbList.add(callback);
559 brokerMap.replace(message.getName(), cbList);
564 * Publish the {@link RpcMessage} to all subscribers.
567 public synchronized void publish(JsonRpcEndpoint endpoint, RpcMessage message) {
568 List<RpcCallback> cbList = brokerMap.get(message.getName());
569 if (cbList == null) {
570 logger.warn("Unhandled Message name is " + message.getName());
574 for (RpcCallback cb : cbList) {
575 cb.callback(endpoint, message);
580 * This notification handles the OpFlex Identity request messages.
583 public void callback(JsonRpcEndpoint endpoint, RpcMessage message) {
585 if (!(message instanceof IdentityRequest)) {
586 logger.warn("message is not identity request {}", message);
589 OpflexRpcServer srv = getOpflexServer(endpoint);
593 IdentityRequest request = (IdentityRequest) message;
594 IdentityResponse.Result result = new IdentityResponse.Result();
596 List<IdentityResponse.Peer> peers = new ArrayList<IdentityResponse.Peer>();
598 IdentityResponse response = new IdentityResponse();
601 * We inherit our role from the server that spawned the connection.
603 List<String> myRoles = new ArrayList<String>();
604 List<Role> roles = srv.getRoles();
606 for (Role r : roles) {
607 myRoles.add(r.toString());
610 result.setMy_role(myRoles);
613 * The peers field contains the identifiers other than my_role
615 if (request.getParams() == null || request.getParams().size() <= 0) {
618 if (request.getParams() == null || request.getParams().get(0) == null
619 || !request.getParams().get(0).getDomain().equals(domain)) {
620 OpflexError error = new OpflexError();
621 error.setMessage(INVALID_DOMAIN);
622 response.setError(error);
623 /* send domain mismatch */
625 for (OpflexRpcServer server : getOpflexServerList()) {
626 /* Skip our server -- reported in my_role */
627 // if ( Objects.equals(server.getId(), srv.getId()))
629 roles = server.getRoles();
631 IdentityResponse.Peer peer = new IdentityResponse.Peer();
632 peer.setConnectivity_info(server.getId());
633 List<String> stringRoles = new ArrayList<String>();
634 for (Role r : roles) {
635 stringRoles.add(r.toString());
637 peer.setRole(stringRoles);
641 result.setPeers(peers);
642 result.setName(srv.getId());
643 result.setDomain(domain);
644 response.setResult(result);
646 response.setId(message.getId());
649 * Collect the set of severs and send in the response
652 endpoint.sendResponse(response);
653 } catch (Exception e) {
654 logger.error("Exception for sending {}, {}", message, e);
659 * This is the notification when a new endpoint has been created. Since the
660 * endpoint is new, we don't have a OpflexConnection for it yet. We create
661 * the OpflexConnection, then retrieve the OpflexRpcServer that created this
662 * connections to inherit some of the fields we need (domain, server).
665 public void addConnection(JsonRpcEndpoint endpoint) {
668 * When the connection is added, we only have the JsonRpcEndpoint. We
669 * use the JsonRpcEndpoint's context field to store the server object
670 * that created this connection, and can look up things like the domain,
671 * etc. to create the containing connection object.
673 if (!(endpoint.getContext() instanceof OpflexRpcServer)) {
674 logger.error("Connection for endpoint {} invalid", endpoint.getIdentifier());
675 // TODO: close connection?
679 OpflexRpcServer server = (OpflexRpcServer) endpoint.getContext();
682 * The OpFlex domain is the same as the server that the agent connected
683 * to. Look up the OpFlex RPC server using the server socket.
685 * It's possible that the server was closed or changed between the
686 * connection establishment and now (race condition). Treat that as a
687 * failure, closing the connection.
689 OpflexAgent oc = new OpflexAgent();
690 oc.setEndpoint(endpoint);
691 oc.setIdentity(endpoint.getIdentifier());
692 oc.setDomain(domain);
693 oc.setOpflexServer(server);
694 oc.setRoles(server.getRoles());
696 logger.trace("Adding agent {}", endpoint.getIdentifier());
700 * Send an Identity Request
702 IdentityRequest ourId = new IdentityRequest();
703 IdentityRequest.Params params = new IdentityRequest.Params();
704 List<IdentityRequest.Params> paramList = new ArrayList<IdentityRequest.Params>();
705 List<String> myRoles = new ArrayList<String>();
706 List<Role> roles = server.getRoles();
708 for (Role r : roles) {
709 myRoles.add(r.toString());
712 params.setMy_role(myRoles);
713 params.setDomain(server.getDomain());
714 params.setName(server.getId());
715 paramList.add(params);
716 ourId.setParams(paramList);
719 endpoint.sendRequest(ourId);
720 } catch (Exception e) {
721 logger.error("Couldn't send Identity {}", e);
726 * This is the notification we receive when a connection is closed. Retrieve
727 * the domain from the {@link JsonRpcEndpoint}'s context field to get the
728 * {@link OpflexRpcServer}, which contains the OpFlex domain for this
729 * connection, then use the identity from the {@link JsonRpcEndpoint} and
730 * domain to remove the {@link OpflexAgent} from the domain
733 public void channelClosed(JsonRpcEndpoint endpoint) throws Exception {
734 logger.trace("Connection to Node : {} closed", endpoint.getIdentifier());
735 OpflexAgent agent = getOpflexConnection(endpoint);
737 removeOpflexAgent(agent);