2 * Copyright (C) 2013 Red Hat, Inc. Copyright (C) 2014 Cisco Systems, Inc.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Madhu Venugopal, Brent Salisbury, Evan Zeller, Thomas Bachman
10 package org.opendaylight.groupbasedpolicy.renderer.opflex.lib;
12 import java.util.ArrayList;
13 import java.util.List;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CopyOnWriteArrayList;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
23 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
27 import org.opendaylight.groupbasedpolicy.jsonrpc.ConnectionService;
28 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
29 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
30 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
31 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
32 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.IdentityRequest;
33 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.IdentityResponse;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.OpflexError;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitionsBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistryBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.ObserverBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepositoryBuilder;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.binding.DataObject;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 import com.google.common.base.Optional;
50 import com.google.common.util.concurrent.FutureCallback;
51 import com.google.common.util.concurrent.Futures;
52 import com.google.common.util.concurrent.ListenableFuture;
55 * Manages the different OpFlex entity connections. It does this on behalf of
56 * each logical OpFlex entity: o Policy Repositories o Endpoint Registries o
59 * Each OpFlex entity defines the JSON RPC methods supported, and manages their
60 * connection/discovery using dedicated servers. Servers and connections are
61 * maintained in dedicated client and server maps.
66 public class OpflexConnectionService implements ConnectionService, RpcBroker,
67 RpcBroker.RpcCallback, DataChangeListener, AutoCloseable {
68 protected static final Logger logger = LoggerFactory
69 .getLogger(OpflexConnectionService.class);
71 public static final String OPFLEX_DOMAIN = "default";
72 static final String INVALID_DOMAIN = "Domain mismatch";
73 // Properties that can be set in config.ini
74 public static final String OPFLEX_LISTENPORT = "opflex.listenPort";
75 private static final Integer defaultOpflexPort = 6670;
76 public static final String OPFLEX_LISTENIP = "opflex.listenIp";
77 private static final String defaultOpflexIp = "0.0.0.0";
79 private Integer opflexListenPort = defaultOpflexPort;
80 private String opflexListenIp = defaultOpflexIp;
82 private final ScheduledExecutorService executor;
83 private final ListenerRegistration<DataChangeListener> dataListener;
86 private ConcurrentMap<String, OpflexAgent> opflexAgents = null;
87 private ConcurrentMap<String, OpflexRpcServer> opflexServers = null;
88 private ConcurrentMap<String, List<RpcCallback>> brokerMap = null;
90 private DiscoveryDefinitions currentIdentities;
91 private final DataBroker dataProvider;
92 private RpcMessageMap messageMap = null;
94 public static final InstanceIdentifier<DiscoveryDefinitions> DISCOVERY_IID = InstanceIdentifier
95 .builder(DiscoveryDefinitions.class).build();
97 public OpflexConnectionService(DataBroker salDataProvider,
98 ScheduledExecutorService executor) {
99 this.dataProvider = salDataProvider;
100 this.executor = executor;
102 this.opflexAgents = new ConcurrentHashMap<String, OpflexAgent>();
103 this.opflexServers = new ConcurrentHashMap<String, OpflexRpcServer>();
106 /* Subscribe to Discovery messages */
107 this.messageMap = new RpcMessageMap();
108 List<RpcMessage> messages = Role.DISCOVERY.getMessages();
109 this.messageMap.addList(messages);
110 for (RpcMessage msg : messages) {
111 this.subscribe(msg, this);
115 * Check configuration to see which listeners we should be creating
117 int listenPort = defaultOpflexPort;
118 String portString = System.getProperty(OPFLEX_LISTENPORT);
119 if (portString != null) {
120 listenPort = Integer.decode(portString).intValue();
122 this.opflexListenPort = listenPort;
123 String listenIp = defaultOpflexIp;
124 String ipString = System.getProperty(OPFLEX_LISTENIP);
125 if (ipString != null) {
128 this.opflexListenIp = listenIp;
132 this.dataListener = dataProvider.registerDataChangeListener(
133 LogicalDatastoreType.CONFIGURATION,
134 OpflexConnectionService.DISCOVERY_IID, this,
135 DataChangeScope.SUBTREE);
138 private List<OpflexRpcServer> setDefaultIdentities() {
141 * Create a single server, filling all roles
143 String identity = opflexListenIp + ":" + opflexListenPort.toString();
144 List<OpflexRpcServer> srvList = new ArrayList<OpflexRpcServer>();
145 List<Role> roles = new ArrayList<Role>();
146 roles.add(Role.POLICY_REPOSITORY);
147 roles.add(Role.ENDPOINT_REGISTRY);
148 roles.add(Role.OBSERVER);
150 OpflexRpcServer srv = new OpflexRpcServer(domain, identity, roles);
151 srv.setConnectionService(this);
152 srv.setRpcBroker(this);
158 private List<OpflexRpcServer> createServerList(
159 DiscoveryDefinitions identities) {
161 if (identities != null) {
162 Map<String, OpflexRpcServer> servers = new ConcurrentHashMap<String, OpflexRpcServer>();
163 List<String> addList = getPolicyRepositories(identities
164 .getPolicyRepository());
165 addServerList(servers, addList, Role.POLICY_REPOSITORY);
166 addList = getEndpointRegistries(identities.getEndpointRegistry());
167 addServerList(servers, addList, Role.ENDPOINT_REGISTRY);
168 addList = getObservers(identities.getObserver());
169 addServerList(servers, addList, Role.OBSERVER);
170 return (new ArrayList<OpflexRpcServer>(servers.values()));
175 private void initializeServers() {
177 * Get the configured identities, if any. If lists are empty, set up a
178 * single instance of each, using the default interface, all inside a
181 domain = OPFLEX_DOMAIN;
186 private List<String> getPolicyRepositories(
187 List<PolicyRepository> repositories) {
188 List<String> identityList = new ArrayList<String>();
189 if (repositories == null)
191 for (PolicyRepository pr : repositories) {
192 String identity = pr.getId() + ":" + pr.getPort().toString();
193 identityList.add(identity);
198 private List<String> getEndpointRegistries(List<EndpointRegistry> registries) {
199 List<String> identityList = new ArrayList<String>();
200 if (registries == null)
202 for (EndpointRegistry epr : registries) {
203 String identity = epr.getId() + ":" + epr.getPort().toString();
204 identityList.add(identity);
209 private List<String> getObservers(List<Observer> observers) {
210 List<String> identityList = new ArrayList<String>();
211 if (observers == null)
213 for (Observer o : observers) {
214 String identity = o.getId() + ":" + o.getPort().toString();
215 identityList.add(identity);
220 private void addServerList(Map<String, OpflexRpcServer> servers,
221 List<String> idList, Role role) {
222 if (idList == null || idList.size() <= 0)
225 for (String id : idList) {
226 List<Role> roles = new ArrayList<Role>();
227 OpflexRpcServer srv = servers.get(id);
229 roles = srv.getRoles();
234 srv = new OpflexRpcServer(domain, id, roles);
235 srv.setConnectionService(this);
236 srv.setRpcBroker(this);
237 servers.put(id, srv);
243 * Find the {@link OpflexAgent} that owns this {@link JsonRpcEndpoint}.
246 * The endpoint to look up
247 * @return The OpflexConnection that owns this endpoint
249 * TODO: should throw an exception of there is no OpflexConnection
250 * that contains this endpoint
252 public OpflexAgent getOpflexConnection(JsonRpcEndpoint endpoint) {
254 return getOpflexAgent(endpoint.getIdentifier());
258 * Get the OpflexRpcServer that spawned this endpoint.
261 * The endpoint to look up
262 * @return The OpflexRpcServer that owns this endpoint, or null if the
263 * server no longer exists
265 * TODO: exception if the endpoint is owned by anything
267 public OpflexRpcServer getOpflexServer(JsonRpcEndpoint endpoint) {
268 if (endpoint.getContext() instanceof OpflexRpcServer) {
269 return (OpflexRpcServer) endpoint.getContext();
271 logger.warn("Couldn't find OpflexConnection for endpoint {}",
272 endpoint.getIdentifier());
277 * Start the {@link OpflexConnectionService}
279 public synchronized void createBroker() {
280 brokerMap = new ConcurrentHashMap<String, List<RpcCallback>>();
283 public ConcurrentMap<String, OpflexAgent> getOpflexAgents() {
287 public void setOpflexAgents(ConcurrentMap<String, OpflexAgent> opflexAgents) {
288 this.opflexAgents = opflexAgents;
291 public ConcurrentMap<String, OpflexRpcServer> getOpflexServers() {
292 return opflexServers;
295 public void setOpflexServers(
296 ConcurrentMap<String, OpflexRpcServer> opflexServers) {
297 this.opflexServers = opflexServers;
300 public void removeOpflexAgent(OpflexAgent agent) {
301 opflexAgents.remove(agent.getIdentity());
304 public void removeOpflexServer(OpflexRpcServer server) {
305 opflexServers.remove(server.getId());
308 public List<OpflexRpcServer> getOpflexServerList() {
309 return new ArrayList<OpflexRpcServer>(opflexServers.values());
313 * Clean up all the entities contained by this domain. The connection
314 * service also owns these references, so we provide notifications to the
315 * connection service so that it can clean up as well.
317 public void cleanup() {
318 List<String> agents = new ArrayList<String>(opflexAgents.keySet());
319 List<String> servers = new ArrayList<String>(opflexServers.keySet());
320 for (String agent : agents) {
321 OpflexAgent conn = opflexAgents.remove(agent);
322 conn.getEndpoint().getChannel().disconnect();
324 for (String srv : servers) {
325 OpflexRpcServer server = opflexServers.get(srv);
326 if (server.getRpcServer().getChannel() != null) {
327 server.getRpcServer().getChannel().disconnect();
333 * Add an {@link OpflexAgent} to the domain
338 public void addOpflexAgent(OpflexAgent agent) {
339 opflexAgents.put(agent.getIdentity(), agent);
343 * Return the {@link OpflexAgent} associated with this identity
346 * A string representing the connections identity
347 * @return The connection represented by that key, or null if not found
349 public OpflexAgent getOpflexAgent(String identity) {
350 return opflexAgents.get(identity);
354 * Add the List of servers to the domain
357 * List of new servers to start
359 public void addServers(List<OpflexRpcServer> serverList) {
361 if (serverList == null)
365 * Check to see if there's already a server with this identity, and if
366 * so, close it and replace it with this one.
368 for (OpflexRpcServer srv : serverList) {
369 OpflexRpcServer server = opflexServers.get(srv.getId());
370 if (server != null) {
371 if (!server.sameServer(srv)) {
372 OpflexRpcServer oldServer = opflexServers.remove(srv
374 if (oldServer != null && oldServer.getRpcServer() != null
375 && oldServer.getRpcServer().getChannel() != null) {
376 oldServer.getRpcServer().getChannel().disconnect();
378 opflexServers.put(srv.getId(), srv);
382 opflexServers.put(srv.getId(), srv);
389 * Drop the list of servers from the domain
392 * The list of servers to drop
394 * TODO: Should we provide notifications to or close the
395 * connections that were spawned by the deleted servers?
397 public void dropServers(List<String> oldServers) {
398 OpflexRpcServer server;
401 * Check to see if there's a server with this identity, and if so, close
404 for (String srv : oldServers) {
405 if (opflexServers.containsKey(srv)) {
406 server = opflexServers.remove(srv);
407 server.getRpcServer().getChannel().disconnect();
413 * Check the new configuration of the servers against the existing, and if
414 * different, delete the old server and replace it with a new server running
415 * the updated parameters.
418 * The new server configurations
420 public void updateServers(List<OpflexRpcServer> serverList) {
421 /* Get the new list of configured servers in this domain */
422 List<OpflexRpcServer> updateServers = new ArrayList<OpflexRpcServer>();
423 List<OpflexRpcServer> newServers = new ArrayList<OpflexRpcServer>();
424 List<String> newList = new ArrayList<String>();
426 for (OpflexRpcServer srv : serverList) {
427 newList.add(srv.getId());
430 /* Get the list of currently configured servers in this domain */
431 List<String> currentList = new ArrayList<String>(opflexServers.keySet());
433 /* Make the add/drop/update lists */
434 List<String> addList = new ArrayList<String>(newList);
435 List<String> dropList = new ArrayList<String>(currentList);
436 List<String> updateList = new ArrayList<String>(newList);
438 addList.removeAll(currentList);
439 dropList.removeAll(newList);
440 updateList.removeAll(addList);
443 * Create add and update lists
445 for (OpflexRpcServer srv : serverList) {
446 if (updateList.contains(srv.getId())) {
447 updateServers.add(srv);
449 if (addList.contains(srv.getId())) {
454 dropServers(dropList);
455 addServers(newServers);
456 addServers(updateServers);
459 private void readConfig() {
460 ListenableFuture<Optional<DiscoveryDefinitions>> dao = dataProvider
461 .newReadOnlyTransaction().read(
462 LogicalDatastoreType.CONFIGURATION, DISCOVERY_IID);
463 Futures.addCallback(dao,
464 new FutureCallback<Optional<DiscoveryDefinitions>>() {
466 public void onSuccess(
467 final Optional<DiscoveryDefinitions> result) {
468 getNewConfig(result);
472 public void onFailure(Throwable t) {
473 logger.error("Failed to read configuration", t);
478 void getNewConfig(final Optional<DiscoveryDefinitions> result) {
480 * Get the new list of discovery definitions from the configuration
481 * store, and convert to a list for manipulation
483 if (!result.isPresent()) {
484 domain = OPFLEX_DOMAIN;
485 if (currentIdentities != null) {
486 dropServers(new ArrayList<String>(opflexServers.keySet()));
488 List<OpflexRpcServer> defaults = setDefaultIdentities();
489 addServers(defaults);
490 commitDefaultConfiguration(defaults);
495 currentIdentities = result.get();
496 if (currentIdentities == null) {
497 dropServers(new ArrayList<String>(opflexServers.keySet()));
498 List<OpflexRpcServer> defaults = setDefaultIdentities();
499 addServers(defaults);
500 commitDefaultConfiguration(defaults);
502 domain = currentIdentities.getDomain();
503 // TODO: what to do about existing agents? keep the same domain?
505 domain = OPFLEX_DOMAIN;
507 updateServers(createServerList(currentIdentities));
511 private void commitDefaultConfiguration(List<OpflexRpcServer> servers) {
512 EndpointRegistryBuilder erb = new EndpointRegistryBuilder();
513 PolicyRepositoryBuilder prb = new PolicyRepositoryBuilder();
514 ObserverBuilder ob = new ObserverBuilder();
515 DiscoveryDefinitionsBuilder ddb = new DiscoveryDefinitionsBuilder();
516 for (OpflexRpcServer srv : servers) {
517 if (srv.getRoles().contains(Role.ENDPOINT_REGISTRY)) {
518 erb.setId(srv.getAddress());
519 erb.setPort(srv.getPort());
521 if (srv.getRoles().contains(Role.POLICY_REPOSITORY)) {
522 prb.setId(srv.getAddress());
523 prb.setPort(srv.getPort());
525 if (srv.getRoles().contains(Role.OBSERVER)) {
526 ob.setId(srv.getAddress());
527 ob.setPort(srv.getPort());
531 List<EndpointRegistry> erl = new ArrayList<EndpointRegistry>();
532 List<PolicyRepository> prl = new ArrayList<PolicyRepository>();
533 List<Observer> ol = new ArrayList<Observer>();
534 erl.add(erb.build());
535 prl.add(prb.build());
538 ddb.setEndpointRegistry(erl);
540 ddb.setPolicyRepository(prl);
541 ddb.setDomain(domain);
542 DiscoveryDefinitions identities = ddb.build();
543 WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
544 wt.put(LogicalDatastoreType.CONFIGURATION, DISCOVERY_IID, identities);
548 // ******************
549 // DataChangeListener
550 // ******************
553 public void onDataChanged(
554 final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
560 * Close the connection service. Implemented from the AutoCloseable
564 public void close() throws ExecutionException, InterruptedException {
566 dataListener.close();
570 * Subscribe to a given {@link RpcMessage}. We synchronize this method to
571 * ensure consistency, and because subscriptions aren't a frequent event.
572 * The list is a copy on write list to ensure that anyone using a list has a
576 public synchronized void subscribe(RpcMessage message, RpcCallback callback) {
579 * Create a new list, replacing the old
581 List<RpcCallback> cbList = brokerMap.get(message.getName());
582 if (cbList == null) {
583 cbList = new CopyOnWriteArrayList<RpcCallback>();
584 cbList.add(callback);
585 brokerMap.put(message.getName(), cbList);
586 } else if (!cbList.contains(callback)) {
587 cbList.add(callback);
588 brokerMap.replace(message.getName(), cbList);
593 * Publish the {@link RpcMessage} to all subscribers.
597 public synchronized void publish(JsonRpcEndpoint endpoint,
598 RpcMessage message) {
599 List<RpcCallback> cbList = brokerMap.get(message.getName());
600 if (cbList == null) {
602 .println("Unhandled Message name is " + message.getName());
606 for (RpcCallback cb : cbList) {
607 cb.callback(endpoint, message);
612 * This notification handles the OpFlex Identity request messages.
615 public void callback(JsonRpcEndpoint endpoint, RpcMessage message) {
617 if (!(message instanceof IdentityRequest)) {
618 logger.warn("message is not identity request {}", message);
621 OpflexRpcServer srv = getOpflexServer(endpoint);
625 IdentityRequest request = (IdentityRequest) message;
626 IdentityResponse.Result result = new IdentityResponse.Result();
628 List<IdentityResponse.Peer> peers = new ArrayList<IdentityResponse.Peer>();
630 IdentityResponse response = new IdentityResponse();
633 * We inherit our role from the server that spawned the connection.
635 List<String> myRoles = new ArrayList<String>();
636 List<Role> roles = srv.getRoles();
638 for (Role r : roles) {
639 myRoles.add(r.toString());
642 result.setMy_role(myRoles);
645 * The peers field contains the identifiers other than my_role
647 if (request.getParams() == null || request.getParams().size() <= 0) {
650 if (request.getParams() == null || request.getParams().get(0) == null
651 || !request.getParams().get(0).getDomain().equals(domain)) {
652 OpflexError error = new OpflexError();
653 error.setMessage(INVALID_DOMAIN);
654 response.setError(error);
655 /* send domain mismatch */
657 for (OpflexRpcServer server : getOpflexServerList()) {
658 /* Skip our server -- reported in my_role */
659 // if ( Objects.equals(server.getId(), srv.getId()))
661 roles = server.getRoles();
663 IdentityResponse.Peer peer = new IdentityResponse.Peer();
664 peer.setConnectivity_info(server.getId());
665 List<String> stringRoles = new ArrayList<String>();
666 for (Role r : roles) {
667 stringRoles.add(r.toString());
669 peer.setRole(stringRoles);
673 result.setPeers(peers);
674 result.setName(srv.getId());
675 result.setDomain(domain);
676 response.setResult(result);
678 response.setId(message.getId());
681 * Collect the set of severs and send in the response
684 endpoint.sendResponse(response);
685 } catch (Throwable e) {
686 logger.error("Throwable for sending {}, {}", message, e);
691 * This is the notification when a new endpoint has been created. Since the
692 * endpoint is new, we don't have a OpflexConnection for it yet. We create
693 * the OpflexConnection, then retrieve the OpflexRpcServer that created this
694 * connections to inherit some of the fields we need (domain, server).
697 public void addConnection(JsonRpcEndpoint endpoint) {
700 * When the connection is added, we only have the JsonRpcEndpoint. We
701 * use the JsonRpcEndpoint's context field to store the server object
702 * that created this connection, and can look up things like the domain,
703 * etc. to create the containing connection object.
705 if (!(endpoint.getContext() instanceof OpflexRpcServer)) {
706 logger.error("Connection for endpoint {} invalid",
707 endpoint.getIdentifier());
708 // TODO: close connection?
712 OpflexRpcServer server = (OpflexRpcServer) endpoint.getContext();
715 * The OpFlex domain is the same as the server that the agent connected
716 * to. Look up the OpFlex RPC server using the server socket.
718 * It's possible that the server was closed or changed between the
719 * connection establishment and now (race condition). Treat that as a
720 * failure, closing the connection.
722 OpflexAgent oc = new OpflexAgent();
723 oc.setEndpoint(endpoint);
724 oc.setIdentity(endpoint.getIdentifier());
725 oc.setDomain(domain);
726 oc.setOpflexServer(server);
727 oc.setRoles(server.getRoles());
729 logger.trace("Adding agent {}", endpoint.getIdentifier());
733 * Send an Identity Request
735 IdentityRequest ourId = new IdentityRequest();
736 IdentityRequest.Params params = new IdentityRequest.Params();
737 List<IdentityRequest.Params> paramList = new ArrayList<IdentityRequest.Params>();
738 List<String> myRoles = new ArrayList<String>();
739 List<Role> roles = server.getRoles();
741 for (Role r : roles) {
742 myRoles.add(r.toString());
745 params.setMy_role(myRoles);
746 params.setDomain(server.getDomain());
747 params.setName(server.getId());
748 paramList.add(params);
749 ourId.setParams(paramList);
752 endpoint.sendRequest(ourId);
753 } catch (Throwable t) {
754 logger.error("Couldn't send Identity {}", t);
759 * This is the notification we receive when a connection is closed. Retrieve
760 * the domain from the {@link JsonRpcEndpoint}'s context field to get the
761 * {@link OpflexRpcServer}, which contains the OpFlex domain for this
762 * connection, then use the identity from the {@link JsonRpcEndpoint} and
763 * domain to remove the {@link OpflexAgent} from the domain
766 public void channelClosed(JsonRpcEndpoint endpoint) throws Exception {
767 logger.trace("Connection to Node : {} closed", endpoint.getIdentifier());
768 OpflexAgent agent = getOpflexConnection(endpoint);
770 removeOpflexAgent(agent);