2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.opflex;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.List;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
21 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.JsonRpcEndpoint;
22 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcBroker;
23 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessage;
24 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessageMap;
25 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexAgent;
26 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexConnectionService;
27 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.Role;
28 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.ManagedObject;
29 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.OpflexError;
30 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyResolveRequest;
31 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyResolveResponse;
32 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUnresolveRequest;
33 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUnresolveResponse;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUpdateRequest;
35 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.MitLib;
36 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.PolicyUri;
37 import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
38 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
39 import org.opendaylight.groupbasedpolicy.resolver.IndexedTenant;
40 import org.opendaylight.groupbasedpolicy.resolver.Policy;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
45 import org.opendaylight.groupbasedpolicy.resolver.RuleGroup;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ConditionName;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.Contract;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 import com.google.common.collect.Lists;
55 import com.google.common.collect.Sets;
58 * Manage policies on agents by subscribing to updates from the
59 * policy resolver and information about endpoints from the endpoint
64 public class PolicyManager implements PolicyListener, RpcBroker.RpcCallback, AutoCloseable {
66 private static final Logger LOG = LoggerFactory.getLogger(PolicyManager.class);
68 private static final String UKNOWN_POLICY = "unknown policy name";
71 * The tables below are used to look up Managed Objects (MOs)
72 * that have been subscribed to. The table is indexed as
73 * <String:Managed Object DN> <String:agent ID> <Policy:policy>
75 final PolicyResolver policyResolver;
76 final OpflexConnectionService connectionService;
77 final ScheduledExecutorService executor;
78 private final MitLib mitLibrary;
79 private final PolicyScope policyScope;
81 private final ConcurrentHashMap<EgKey, Set<String>> epgSubscriptions = new ConcurrentHashMap<>();
82 private RpcMessageMap messageMap = null;
84 public PolicyManager(PolicyResolver policyResolver, OpflexConnectionService connectionService,
85 ScheduledExecutorService executor, MitLib mitLibrary) {
87 this.executor = executor;
88 this.policyResolver = policyResolver;
89 this.connectionService = connectionService;
90 this.mitLibrary = mitLibrary;
92 /* Subscribe to PR messages */
93 messageMap = new RpcMessageMap();
94 List<RpcMessage> messages = Role.POLICY_REPOSITORY.getMessages();
95 messageMap.addList(messages);
96 for (RpcMessage msg : messages) {
97 this.connectionService.subscribe(msg, this);
100 policyScope = policyResolver.registerListener(this);
102 LOG.debug("Initialized OpFlex policy manager");
106 * Shut down the {@link PolicyManager}. Implemented from the
107 * AutoCloseable interface.
110 public void close() throws ExecutionException, InterruptedException {
119 public void policyUpdated(Set<EgKey> updatedConsumers) {
121 sendPolicyUpdates(updatedConsumers);
129 * Update all policy on all agents as needed. Note that this will block
130 * one of the threads on the executor.
134 private void sendPolicyUpdates(Set<EgKey> updatedConsumers) {
135 Map<String, Set<EgKey>> agentMap = new HashMap<String, Set<EgKey>>();
137 PolicyInfo info = policyResolver.getCurrentPolicy();
142 * First build a per-agent set of EPGs that need updating
144 for (EgKey cepg : updatedConsumers) {
147 * Find the set of agents that have subscribed to
148 * updates for this EPG
150 for (String agentId : epgSubscriptions.get(cepg)) {
151 Set<EgKey> egSet = agentMap.get(agentId);
153 egSet = Collections.newSetFromMap(new ConcurrentHashMap<EgKey, Boolean>());
154 agentMap.put(agentId, egSet);
161 * Go through each agent and provide a single update for all EPGs
163 for (Map.Entry<String, Set<EgKey>> entry : agentMap.entrySet()) {
164 OpflexAgent agent = connectionService.getOpflexAgent(entry.getKey());
168 sendPolicyUpdate(agent.getEndpoint(), entry.getValue(), info);
174 * This implements Runnable, which allows the {@link ScheduledExecutorService} to execute the
175 * run() method to implement the update
179 private class PolicyUpdate implements Runnable {
181 private final JsonRpcEndpoint agent;
182 private final Set<EgKey> epgSet;
183 private final PolicyInfo info;
185 PolicyUpdate(JsonRpcEndpoint agent, Set<EgKey> epgSet, PolicyInfo info) {
187 this.epgSet = epgSet;
193 List<ManagedObject> subtrees = new ArrayList<ManagedObject>();
195 PolicyUpdateRequest request = new PolicyUpdateRequest();
196 List<PolicyUpdateRequest.Params> paramsList = new ArrayList<>();
197 PolicyUpdateRequest.Params params = new PolicyUpdateRequest.Params();
200 * We may need to optimize this in the future. Currently
201 * we send down the EPG MOs and all the related policy
202 * that's in scope from the PolicyResolver. If we want
203 * to optimize this in the future to only send the policy
204 * objects that changed, we'd either have to change the
205 * PolicyResolver to provide this delta, or we'd have to
206 * keep cached state for each node.
208 for (EgKey epg : epgSet) {
210 * Get EPGs from the IndexedTenant, as the EPGs from
211 * the IndexedTenenat alread has collapsed the EPGs
212 * (i.e. inheritance accounted for)
217 IndexedTenant it = policyResolver.getTenant(epg.getTenantId());
218 List<ManagedObject> relatedMos = getPolicy(epg, info, it);
219 subtrees.addAll(relatedMos);
223 * Currently not using delete URI or merge_children MOs
225 params.setDelete_uri(new ArrayList<Uri>());
226 params.setMerge_children(new ArrayList<ManagedObject>());
227 params.setReplace(subtrees);
228 paramsList.add(params);
229 request.setParams(paramsList);
231 agent.sendRequest(request);
232 } catch (Throwable t) {
238 void sendPolicyUpdate(JsonRpcEndpoint agent, Set<EgKey> epgSet, PolicyInfo info) {
239 executor.execute(new PolicyUpdate(agent, epgSet, info));
243 * This method creates {@link ManagedObject} POJOs for all of the
244 * policy objects that need to be sent as a result of policy
245 * resolution for the given EPG.
247 * @param epg The Endpoint Group that was resolved
248 * @param policySnapshot A snapshot of the current resolved policy
250 private List<ManagedObject> getPolicy(EgKey epg, PolicyInfo policySnapshot, IndexedTenant it) {
251 if (policySnapshot == null)
254 Set<ManagedObject> policyMos = Sets.newHashSet();
255 Set<EgKey> peers = policySnapshot.getPeers(epg);
257 if (peers == null || peers.size() <= 0)
260 // Allocate an MO for the requested EPG
261 ManagedObject epgMo = new ManagedObject();
262 for (EgKey depg : peers) {
264 * Construct the base URI, so that we can
265 * continue adding on to create child MOs.
266 * We use the peer EPG for getting the policy
268 PolicyUri uri = new PolicyUri();
269 uri.push(MessageUtils.TENANTS_RN);
270 uri.push(MessageUtils.TENANT_RN);
271 uri.push(depg.getTenantId().getValue());
273 Policy policy = policySnapshot.getPolicy(epg, depg);
274 if (policy == null || policy == Policy.EMPTY)
278 * We now have a policy that we need to send to the agent.
279 * Provide empty condition lists for now - need to be
280 * an actual empty list, instead of null
282 * TODO: get actual condition groups
284 List<ConditionName> conds = new ArrayList<ConditionName>();
285 ConditionGroup cgSrc = policySnapshot.getEgCondGroup(epg, conds);
286 ConditionGroup cgDst = policySnapshot.getEgCondGroup(depg, conds);
287 List<RuleGroup> rgl = policy.getRules(cgSrc, cgDst);
290 * RuleGroups can refer to the same contract. As result,
291 * we need to keep track of contracts returned and merge
292 * the results into a single ManagedObject
294 Map<Contract, ManagedObject> contracts = new ConcurrentHashMap<Contract, ManagedObject>();
296 for (RuleGroup rg : rgl) {
299 * Construct a new URI for the EPG requested.
300 * In this case, we want the requested EPG, not
303 PolicyUri puri = new PolicyUri();
304 puri.push(MessageUtils.TENANTS_RN);
305 puri.push(MessageUtils.TENANT_RN);
306 puri.push(epg.getTenantId().getValue());
307 puri.push(MessageUtils.EPG_RN);
308 puri.push(epg.getEgId().getValue());
309 Set<ManagedObject> epgMos = MessageUtils.getEndpointGroupMo(epgMo, puri,
310 it.getEndpointGroup(epg.getEgId()), rg);
311 if (epgMos != null) {
312 policyMos.addAll(epgMos);
315 Contract c = rg.getRelatedContract();
317 * This cmol list is used as a container to pass
318 * an out parameter for the contract MO. This MO
319 * is returned separately from the others because
320 * may require merging -- different RuleGroup
321 * objects can refer to the same contract
323 List<ManagedObject> cmol = new ArrayList<>();
325 uri.push(MessageUtils.CONTRACT_RN);
326 uri.push(c.getId().getValue());
327 List<ManagedObject> mol = MessageUtils.getContractAndSubMos(cmol, uri, c, rg, it);
331 // walk back to the tenant for next contract URI
335 if (contracts.get(c) != null) {
337 * Aggregate the child URIs and properties.
339 MessageUtils.mergeMos(contracts.get(c), cmol.remove(0));
341 contracts.put(c, cmol.remove(0));
343 policyMos.addAll(mol);
346 policyMos.add(epgMo);
347 // add in the contracts
348 policyMos.addAll(contracts.values());
350 return Lists.newArrayList(policyMos);
353 private void addPolicySubscription(JsonRpcEndpoint endpoint, EgKey epgId) {
354 policyScope.addToScope(epgId.getTenantId(), epgId.getEgId());
356 Set<String> agents = epgSubscriptions.get(epgId);
357 if (agents == null) {
358 agents = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
359 Set<String> result = epgSubscriptions.putIfAbsent(epgId, agents);
360 if (result != null) {
364 agents.add(endpoint.getIdentifier());
368 private void removePolicySubscription(JsonRpcEndpoint endpoint, EgKey epgId) {
369 Set<String> agents = epgSubscriptions.get(epgId);
370 if (agents != null) {
371 agents.remove(endpoint.getIdentifier());
373 policyScope.removeFromScope(epgId.getTenantId(), epgId.getEgId());
377 public void callback(JsonRpcEndpoint endpoint, RpcMessage request) {
379 if (messageMap.get(request.getMethod()) == null) {
380 LOG.warn("message {} was not subscribed to, but was delivered.", request);
384 RpcMessage response = null;
386 if (request instanceof PolicyResolveRequest) {
387 PolicyResolveRequest req = (PolicyResolveRequest) request;
388 PolicyResolveResponse msg = new PolicyResolveResponse();
389 PolicyResolveResponse.Result result = new PolicyResolveResponse.Result();
390 msg.setId(request.getId());
393 LOG.warn("Invalid resolve request: {}", req);
394 OpflexError error = new OpflexError();
395 error.setCode(OpflexError.ErrorCode.ERROR.toString());
396 // error.setData(data);
397 // error.setMessage(message);
398 // error.setTrace(trace);
402 List<ManagedObject> mol = new ArrayList<>();
404 for (PolicyResolveRequest.Params p : req.getParams()) {
406 // Skip this if we already have an error
407 if (msg.getError() != null)
411 * Only Policy Identity or Policy URI is present.
412 * Convert Policy Identities to a URI that we can use
414 Uri policyUri = p.getPolicy_uri();
415 if (policyUri == null) {
416 Uri rn = p.getPolicy_ident().getContext();
417 String name = p.getPolicy_ident().getName();
418 PolicyUri puri = new PolicyUri(rn.getValue());
420 policyUri = puri.getUri();
423 // See if the request has an EPG in the URI
424 if (MessageUtils.hasEpg(policyUri.getValue())) {
426 * Keep track of EPGs requested by agents.
428 EndpointGroupId egid = new EndpointGroupId(
429 MessageUtils.getEndpointGroupFromUri(policyUri.getValue()));
430 TenantId tid = new TenantId(MessageUtils.getTenantFromUri(policyUri.getValue()));
431 EgKey epgId = new EgKey(tid, egid);
433 addPolicySubscription(endpoint, epgId);
435 IndexedTenant it = policyResolver.getTenant(tid);
437 List<ManagedObject> relatedMos = getPolicy(epgId, policyResolver.getCurrentPolicy(), it);
438 if (relatedMos != null) {
439 mol.addAll(relatedMos);
443 OpflexError error = new OpflexError();
444 error.setMessage(UKNOWN_POLICY);
445 error.setCode(OpflexError.ErrorCode.EUNSUPPORTED.toString());
446 // error.setData(data);
447 // error.setTrace(trace);
452 result.setPolicy(mol);
453 msg.setResult(result);
455 } else if (request instanceof PolicyUnresolveRequest) {
456 PolicyUnresolveRequest req = (PolicyUnresolveRequest) request;
457 PolicyUnresolveResponse msg = new PolicyUnresolveResponse();
458 msg.setId(request.getId());
462 OpflexError error = new OpflexError();
463 error.setCode(OpflexError.ErrorCode.ERROR.toString());
464 // error.setData(data);
465 // error.setMessage(message);
466 // error.setTrace(trace);
470 for (PolicyUnresolveRequest.Params p : req.getParams()) {
472 // Skip this if we already have an error
473 if (msg.getError() != null)
477 * Only Policy Identity or Policy URI is present.
478 * Convert to a URI that we'll use
480 policyUri = p.getPolicy_uri();
481 if (policyUri == null) {
482 // Convert the RN/name to DN
483 Uri rn = p.getPolicy_ident().getContext();
484 String name = p.getPolicy_ident().getName();
485 PolicyUri puri = new PolicyUri(rn.getValue());
487 policyUri = puri.getUri();
490 if (MessageUtils.hasEpg(policyUri.getValue())) {
492 * Keep track of EPGs requested by agents.
494 EndpointGroupId egid = new EndpointGroupId(
495 MessageUtils.getEndpointGroupFromUri(policyUri.getValue()));
496 TenantId tid = new TenantId(MessageUtils.getTenantFromUri(policyUri.getValue()));
497 EgKey epgId = new EgKey(tid, egid);
499 removePolicySubscription(endpoint, epgId);
501 OpflexError error = new OpflexError();
502 error.setMessage(UKNOWN_POLICY);
509 if (response != null) {
511 endpoint.sendResponse(response);
512 } catch (Throwable t) {
513 LOG.warn("Error sending response {}", t);