2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.opflex;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.List;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
21 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
22 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
23 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
24 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
25 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexAgent;
26 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexConnectionService;
27 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.Role;
28 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.ManagedObject;
29 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.OpflexError;
30 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyResolveRequest;
31 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyResolveResponse;
32 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUnresolveRequest;
33 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUnresolveResponse;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUpdateRequest;
35 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.MitLib;
36 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.PolicyUri;
37 import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
38 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
39 import org.opendaylight.groupbasedpolicy.resolver.IndexedTenant;
40 import org.opendaylight.groupbasedpolicy.resolver.Policy;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
45 import org.opendaylight.groupbasedpolicy.resolver.RuleGroup;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ConditionName;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.Contract;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 import com.google.common.collect.Lists;
56 import com.google.common.collect.Sets;
60 * Manage policies on agents by subscribing to updates from the
61 * policy resolver and information about endpoints from the endpoint
65 public class PolicyManager
66 implements PolicyListener, RpcBroker.RpcCallback, AutoCloseable {
67 private static final Logger LOG =
68 LoggerFactory.getLogger(PolicyManager.class);
70 private static final String UKNOWN_POLICY = "unknown policy name";
73 * The tables below are used to look up Managed Objects (MOs)
74 * that have been subscribed to. The table is indexed as
75 * <String:Managed Object DN> <String:agent ID> <Policy:policy>
77 final PolicyResolver policyResolver;
78 final OpflexConnectionService connectionService;
79 final ScheduledExecutorService executor;
80 private final MitLib mitLibrary;
81 private final PolicyScope policyScope;
83 private ConcurrentHashMap<EgKey, Set<String>> epgSubscriptions;
84 private RpcMessageMap messageMap = null;
87 public PolicyManager(PolicyResolver policyResolver,
88 OpflexConnectionService connectionService,
89 ScheduledExecutorService executor,
92 this.executor = executor;
93 this.policyResolver = policyResolver;
94 this.connectionService = connectionService;
95 this.mitLibrary = mitLibrary;
97 epgSubscriptions = new ConcurrentHashMap<>();
99 /* Subscribe to PR messages */
100 messageMap = new RpcMessageMap();
101 List<RpcMessage> messages = Role.POLICY_REPOSITORY.getMessages();
102 messageMap.addList(messages);
103 for (RpcMessage msg: messages) {
104 this.connectionService.subscribe(msg, this);
107 policyScope = policyResolver.registerListener(this);
109 LOG.debug("Initialized OpFlex policy manager");
113 * Shut down the {@link PolicyManager}. Implemented from the
114 * AutoCloseable interface.
117 public void close() throws ExecutionException, InterruptedException {
126 public void policyUpdated(Set<EgKey> updatedConsumers) {
128 sendPolicyUpdates(updatedConsumers);
132 * Set the learning mode to the specified value
133 * @param learningMode the learning mode to set
135 public void setLearningMode(LearningMode learningMode) {
144 * Update all policy on all agents as needed. Note that this will block
145 * one of the threads on the executor.
148 private void sendPolicyUpdates(Set<EgKey> updatedConsumers) {
149 Map<String, Set<EgKey>> agentMap = new HashMap<String, Set<EgKey>>();
151 PolicyInfo info = policyResolver.getCurrentPolicy();
152 if (info == null) return;
155 * First build a per-agent set of EPGs that need updating
157 for (EgKey cepg: updatedConsumers) {
160 * Find the set of agents that have subscribed to
161 * updates for this EPG
163 for (String agentId: epgSubscriptions.get(cepg)) {
164 Set<EgKey> egSet = agentMap.get(agentId);
168 newSetFromMap(new ConcurrentHashMap<EgKey, Boolean>());
169 agentMap.put(agentId, egSet);
176 * Go through each agent and provide a single update for all EPGs
178 for (Map.Entry<String,Set<EgKey>> entry: agentMap.entrySet()) {
179 OpflexAgent agent = connectionService.
180 getOpflexAgent(entry.getKey());
181 if (agent == null) continue;
183 sendPolicyUpdate(agent.getEndpoint(), entry.getValue(), info);
189 * This implements Runnable, which allows the {@link ScheduledExecutorservice}
190 * to execute the run() method to implement the update
195 private class PolicyUpdate implements Runnable {
196 private final JsonRpcEndpoint agent;
197 private final Set<EgKey> epgSet;
198 private final PolicyInfo info;
200 PolicyUpdate(JsonRpcEndpoint agent, Set<EgKey> epgSet, PolicyInfo info) {
202 this.epgSet = epgSet;
208 List<ManagedObject> subtrees =
209 new ArrayList<ManagedObject>();
211 PolicyUpdateRequest request =
212 new PolicyUpdateRequest();
213 List<PolicyUpdateRequest.Params> paramsList =
215 PolicyUpdateRequest.Params params =
216 new PolicyUpdateRequest.Params();
219 * We may need to optimize this in the future. Currently
220 * we send down the EPG MOs and all the related policy
221 * that's in scope from the PolicyResolver. If we want
222 * to optimize this in the future to only send the policy
223 * objects that changed, we'd either have to change the
224 * PolicyResolver to provide this delta, or we'd have to
225 * keep cached state for each node.
227 for (EgKey epg: epgSet) {
229 * Get EPGs from the IndexedTenant, as the EPGs from
230 * the IndexedTenenat alread has collapsed the EPGs
231 * (i.e. inheritance accounted for)
236 IndexedTenant it = policyResolver.getTenant(epg.getTenantId());
237 List<ManagedObject> relatedMos = getPolicy(epg, info, it);
238 subtrees.addAll(relatedMos);
242 * Currently not using delete URI or merge_children MOs
244 params.setDelete_uri(new ArrayList<Uri>());
245 params.setMerge_children(new ArrayList<ManagedObject>());
246 params.setReplace(subtrees);
247 paramsList.add(params);
248 request.setParams(paramsList);
250 agent.sendRequest(request);
252 catch (Throwable t) {
258 void sendPolicyUpdate(JsonRpcEndpoint agent, Set<EgKey> epgSet, PolicyInfo info) {
259 executor.execute(new PolicyUpdate(agent, epgSet, info));
266 * This method creates {@link ManagedObject} POJOs for all of the
267 * policy objects that need to be sent as a result of policy
268 * resolution for the given EPG.
270 * @param epg The Endpoint Group that was resolved
271 * @param policySnapshot A snapshot of the current resolved policy
273 private List<ManagedObject> getPolicy(EgKey epg,
274 PolicyInfo policySnapshot, IndexedTenant it) {
275 if (policySnapshot == null) return null;
277 Set<ManagedObject> policyMos = Sets.newHashSet();
278 Set<EgKey> peers = policySnapshot.getPeers(epg);
280 if (peers == null || peers.size() <= 0) return null;
282 // Allocate an MO for the requested EPG
283 ManagedObject epgMo = new ManagedObject();
284 for (EgKey depg: peers) {
286 * Construct the base URI, so that we can
287 * continue adding on to create child MOs.
288 * We use the peer EPG for getting the policy
290 PolicyUri uri = new PolicyUri();
291 uri.push(MessageUtils.TENANTS_RN);
292 uri.push(MessageUtils.TENANT_RN);
293 uri.push(depg.getTenantId().getValue());
295 Policy policy = policySnapshot.getPolicy(epg, depg);
296 if (policy == null || policy == Policy.EMPTY) continue;
299 * We now have a policy that we need to send to the agent.
300 * Provide empty condition lists for now - need to be
301 * an actual empty list, instead of null
303 * TODO: get actual condition groups
305 List<ConditionName> conds = new ArrayList<ConditionName>();
306 ConditionGroup cgSrc = policySnapshot.getEgCondGroup(epg, conds);
307 ConditionGroup cgDst = policySnapshot.getEgCondGroup(depg, conds);
308 List<RuleGroup> rgl = policy.getRules(cgSrc, cgDst);
311 * RuleGroups can refer to the same contract. As result,
312 * we need to keep track of contracts returned and merge
313 * the results into a single ManagedObject
315 Map<Contract, ManagedObject> contracts =
316 new ConcurrentHashMap<Contract, ManagedObject>();
318 for (RuleGroup rg: rgl) {
321 * Construct a new URI for the EPG requested.
322 * In this case, we want the requested EPG, not
325 PolicyUri puri = new PolicyUri();
326 puri.push(MessageUtils.TENANTS_RN);
327 puri.push(MessageUtils.TENANT_RN);
328 puri.push(epg.getTenantId().getValue());
329 puri.push(MessageUtils.EPG_RN);
330 puri.push(epg.getEgId().getValue());
331 Set<ManagedObject> epgMos = MessageUtils.
332 getEndpointGroupMo(epgMo,
334 it.getEndpointGroup(epg.getEgId()),
336 if (epgMos != null) {
337 policyMos.addAll(epgMos);
341 Contract c = rg.getRelatedContract();
343 * This cmol list is used as a container to pass
344 * an out parameter for the contract MO. This MO
345 * is returned separately from the others because
346 * may require merging -- different RuleGroup
347 * objects can refer to the same contract
349 List<ManagedObject> cmol = new ArrayList<ManagedObject>();
350 List<ManagedObject> mol = null;
352 uri.push(MessageUtils.CONTRACT_RN);
353 uri.push(c.getId().getValue());
354 mol = MessageUtils.getContractAndSubMos(cmol, uri, c, rg, it);
355 if (mol == null) continue;
357 // walk back to the tenant for next contract URI
358 uri.pop(); uri.pop();
360 if(contracts.get(c) != null) {
362 * Aggregate the child URIs and properties.
364 MessageUtils.mergeMos(contracts.get(c), cmol.remove(0));
367 contracts.put(c, cmol.remove(0));
369 policyMos.addAll(mol);
372 policyMos.add(epgMo);
373 // add in the contracts
374 policyMos.addAll(contracts.values());
376 return Lists.newArrayList(policyMos);
379 private void addPolicySubscription(JsonRpcEndpoint endpoint, EgKey epgId) {
380 policyScope.addToScope(epgId.getTenantId(), epgId.getEgId());
382 Set<String> agents = epgSubscriptions.get(epgId);
383 if (agents == null) {
384 agents = Collections.
385 newSetFromMap(new ConcurrentHashMap<String, Boolean>());
386 Set<String> result = epgSubscriptions.putIfAbsent(epgId, agents);
387 if (result != null) {
391 agents.add(endpoint.getIdentifier());
395 private void removePolicySubscription(JsonRpcEndpoint endpoint, EgKey epgId) {
396 Set<String> agents = epgSubscriptions.get(epgId);
397 if (agents != null) {
398 agents.remove(endpoint.getIdentifier());
400 policyScope.removeFromScope(epgId.getTenantId(), epgId.getEgId());
404 public void callback(JsonRpcEndpoint endpoint, RpcMessage request) {
406 if (messageMap.get(request.getMethod()) == null) {
407 LOG.warn("message {} was not subscribed to, but was delivered.", request);
411 RpcMessage response = null;
413 if (request instanceof PolicyResolveRequest) {
414 PolicyResolveRequest req = (PolicyResolveRequest)request;
415 PolicyResolveResponse msg = new PolicyResolveResponse();
416 PolicyResolveResponse.Result result = new PolicyResolveResponse.Result();
417 msg.setId(request.getId());
420 LOG.warn("Invalid resolve request: {}", req);
421 OpflexError error = new OpflexError();
422 error.setCode(OpflexError.ErrorCode.ERROR.toString());
423 //error.setData(data);
424 //error.setMessage(message);
425 //error.setTrace(trace);
429 Uri policyUri = null;
430 List<ManagedObject> mol = new ArrayList<ManagedObject>();
432 for (PolicyResolveRequest.Params p: req.getParams()) {
434 // Skip this if we already have an error
435 if (msg.getError() != null) break;
438 * Only Policy Identity or Policy URI is present.
439 * Convert Policy Identities to a URI that we can use
441 policyUri = p.getPolicy_uri();
442 if (policyUri == null) {
443 Uri rn = p.getPolicy_ident().getContext();
444 String name = p.getPolicy_ident().getName();
445 PolicyUri puri = new PolicyUri(rn.getValue());
447 policyUri = puri.getUri();
450 // See if the request has an EPG in the URI
451 if (MessageUtils.hasEpg(policyUri.getValue())) {
453 * Keep track of EPGs requested by agents.
455 EndpointGroupId egid = new EndpointGroupId(MessageUtils.
456 getEndpointGroupFromUri(policyUri.getValue()));
457 TenantId tid = new TenantId(MessageUtils.
458 getTenantFromUri(policyUri.getValue()));
459 EgKey epgId = new EgKey(tid, egid);
461 addPolicySubscription(endpoint, epgId);
463 IndexedTenant it = policyResolver.getTenant(tid);
465 List<ManagedObject> relatedMos =
466 getPolicy(epgId, policyResolver.getCurrentPolicy(), it);
467 if (relatedMos != null) {
468 mol.addAll(relatedMos);
475 error.setMessage(UKNOWN_POLICY);
476 error.setCode(OpflexError.ErrorCode.EUNSUPPORTED.toString());
477 //error.setData(data);
478 //error.setTrace(trace);
483 result.setPolicy(mol);
484 msg.setResult(result);
487 else if (request instanceof PolicyUnresolveRequest) {
488 PolicyUnresolveRequest req = (PolicyUnresolveRequest)request;
489 PolicyUnresolveResponse msg = new PolicyUnresolveResponse();
490 msg.setId(request.getId());
494 OpflexError error = new OpflexError();
495 error.setCode(OpflexError.ErrorCode.ERROR.toString());
496 //error.setData(data);
497 //error.setMessage(message);
498 //error.setTrace(trace);
502 for (PolicyUnresolveRequest.Params p: req.getParams()) {
504 // Skip this if we already have an error
505 if (msg.getError() != null) break;
508 * Only Policy Identity or Policy URI is present.
509 * Convert to a URI that we'll use
511 policyUri = p.getPolicy_uri();
512 if (policyUri == null) {
513 // Convert the RN/name to DN
514 Uri rn = p.getPolicy_ident().getContext();
515 String name = p.getPolicy_ident().getName();
516 PolicyUri puri = new PolicyUri(rn.getValue());
518 policyUri = puri.getUri();
521 if (MessageUtils.hasEpg(policyUri.getValue())) {
523 * Keep track of EPGs requested by agents.
525 EndpointGroupId egid = new EndpointGroupId(MessageUtils.
526 getEndpointGroupFromUri(policyUri.getValue()));
527 TenantId tid = new TenantId(MessageUtils.
528 getTenantFromUri(policyUri.getValue()));
529 EgKey epgId = new EgKey(tid, egid);
531 removePolicySubscription(endpoint, epgId);
536 error.setMessage(UKNOWN_POLICY);
543 if (response != null) {
545 endpoint.sendResponse(response);
547 catch (Throwable t) {
548 LOG.warn("Error sending response {}", t);