Feature uses features-parent as parent
[groupbasedpolicy.git] / renderers / opflex / src / main / java / org / opendaylight / groupbasedpolicy / renderer / opflex / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  * 
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.opflex;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
20
21 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.JsonRpcEndpoint;
22 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcBroker;
23 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessage;
24 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessageMap;
25 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexAgent;
26 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexConnectionService;
27 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.Role;
28 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.ManagedObject;
29 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.OpflexError;
30 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyResolveRequest;
31 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyResolveResponse;
32 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUnresolveRequest;
33 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUnresolveResponse;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUpdateRequest;
35 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.MitLib;
36 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.PolicyUri;
37 import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
38 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
39 import org.opendaylight.groupbasedpolicy.resolver.IndexedTenant;
40 import org.opendaylight.groupbasedpolicy.resolver.Policy;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
45 import org.opendaylight.groupbasedpolicy.resolver.RuleGroup;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ConditionName;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.Contract;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.collect.Lists;
55 import com.google.common.collect.Sets;
56
57 /**
58  * Manage policies on agents by subscribing to updates from the
59  * policy resolver and information about endpoints from the endpoint
60  * registry
61  * 
62  * @author tbachman
63  */
64 public class PolicyManager implements PolicyListener, RpcBroker.RpcCallback, AutoCloseable {
65
66     private static final Logger LOG = LoggerFactory.getLogger(PolicyManager.class);
67
68     private static final String UKNOWN_POLICY = "unknown policy name";
69
70     /*
71      * The tables below are used to look up Managed Objects (MOs)
72      * that have been subscribed to. The table is indexed as
73      * <String:Managed Object DN> <String:agent ID> <Policy:policy>
74      */
75     final PolicyResolver policyResolver;
76     final OpflexConnectionService connectionService;
77     final ScheduledExecutorService executor;
78     private final MitLib mitLibrary;
79     private final PolicyScope policyScope;
80
81     private final ConcurrentHashMap<EgKey, Set<String>> epgSubscriptions = new ConcurrentHashMap<>();
82     private RpcMessageMap messageMap = null;
83
84     public PolicyManager(PolicyResolver policyResolver, OpflexConnectionService connectionService,
85             ScheduledExecutorService executor, MitLib mitLibrary) {
86         super();
87         this.executor = executor;
88         this.policyResolver = policyResolver;
89         this.connectionService = connectionService;
90         this.mitLibrary = mitLibrary;
91
92         /* Subscribe to PR messages */
93         messageMap = new RpcMessageMap();
94         List<RpcMessage> messages = Role.POLICY_REPOSITORY.getMessages();
95         messageMap.addList(messages);
96         for (RpcMessage msg : messages) {
97             this.connectionService.subscribe(msg, this);
98         }
99
100         policyScope = policyResolver.registerListener(this);
101
102         LOG.debug("Initialized OpFlex policy manager");
103     }
104
105     /**
106      * Shut down the {@link PolicyManager}. Implemented from the
107      * AutoCloseable interface.
108      */
109     @Override
110     public void close() throws ExecutionException, InterruptedException {
111
112     }
113
114     // **************
115     // PolicyListener
116     // **************
117
118     @Override
119     public void policyUpdated(Set<EgKey> updatedConsumers) {
120
121         sendPolicyUpdates(updatedConsumers);
122     }
123
124     // **************
125     // Implementation
126     // **************
127
128     /**
129      * Update all policy on all agents as needed. Note that this will block
130      * one of the threads on the executor.
131      * 
132      * @author tbachman
133      */
134     private void sendPolicyUpdates(Set<EgKey> updatedConsumers) {
135         Map<String, Set<EgKey>> agentMap = new HashMap<String, Set<EgKey>>();
136
137         PolicyInfo info = policyResolver.getCurrentPolicy();
138         if (info == null)
139             return;
140
141         /*
142          * First build a per-agent set of EPGs that need updating
143          */
144         for (EgKey cepg : updatedConsumers) {
145
146             /*
147              * Find the set of agents that have subscribed to
148              * updates for this EPG
149              */
150             for (String agentId : epgSubscriptions.get(cepg)) {
151                 Set<EgKey> egSet = agentMap.get(agentId);
152                 if (egSet == null) {
153                     egSet = Collections.newSetFromMap(new ConcurrentHashMap<EgKey, Boolean>());
154                     agentMap.put(agentId, egSet);
155                 }
156                 egSet.add(cepg);
157             }
158         }
159
160         /*
161          * Go through each agent and provide a single update for all EPGs
162          */
163         for (Map.Entry<String, Set<EgKey>> entry : agentMap.entrySet()) {
164             OpflexAgent agent = connectionService.getOpflexAgent(entry.getKey());
165             if (agent == null)
166                 continue;
167
168             sendPolicyUpdate(agent.getEndpoint(), entry.getValue(), info);
169
170         }
171     }
172
173     /**
174      * This implements Runnable, which allows the {@link ScheduledExecutorService} to execute the
175      * run() method to implement the update
176      *
177      * @author tbachman
178      */
179     private class PolicyUpdate implements Runnable {
180
181         private final JsonRpcEndpoint agent;
182         private final Set<EgKey> epgSet;
183         private final PolicyInfo info;
184
185         PolicyUpdate(JsonRpcEndpoint agent, Set<EgKey> epgSet, PolicyInfo info) {
186             this.agent = agent;
187             this.epgSet = epgSet;
188             this.info = info;
189         }
190
191         @Override
192         public void run() {
193             List<ManagedObject> subtrees = new ArrayList<ManagedObject>();
194
195             PolicyUpdateRequest request = new PolicyUpdateRequest();
196             List<PolicyUpdateRequest.Params> paramsList = new ArrayList<>();
197             PolicyUpdateRequest.Params params = new PolicyUpdateRequest.Params();
198
199             /*
200              * We may need to optimize this in the future. Currently
201              * we send down the EPG MOs and all the related policy
202              * that's in scope from the PolicyResolver. If we want
203              * to optimize this in the future to only send the policy
204              * objects that changed, we'd either have to change the
205              * PolicyResolver to provide this delta, or we'd have to
206              * keep cached state for each node.
207              */
208             for (EgKey epg : epgSet) {
209                 /*
210                  * Get EPGs from the IndexedTenant, as the EPGs from
211                  * the IndexedTenenat alread has collapsed the EPGs
212                  * (i.e. inheritance accounted for)
213                  * 
214                  * TODO: needed?
215                  */
216
217                 IndexedTenant it = policyResolver.getTenant(epg.getTenantId());
218                 List<ManagedObject> relatedMos = getPolicy(epg, info, it);
219                 subtrees.addAll(relatedMos);
220             }
221
222             /*
223              * Currently not using delete URI or merge_children MOs
224              */
225             params.setDelete_uri(new ArrayList<Uri>());
226             params.setMerge_children(new ArrayList<ManagedObject>());
227             params.setReplace(subtrees);
228             paramsList.add(params);
229             request.setParams(paramsList);
230             try {
231                 agent.sendRequest(request);
232             } catch (Exception e) {
233
234             }
235         }
236     }
237
238     void sendPolicyUpdate(JsonRpcEndpoint agent, Set<EgKey> epgSet, PolicyInfo info) {
239         executor.execute(new PolicyUpdate(agent, epgSet, info));
240     }
241
242     /**
243      * This method creates {@link ManagedObject} POJOs for all of the
244      * policy objects that need to be sent as a result of policy
245      * resolution for the given EPG.
246      *
247      * @param epg The Endpoint Group that was resolved
248      * @param policySnapshot A snapshot of the current resolved policy
249      */
250     private List<ManagedObject> getPolicy(EgKey epg, PolicyInfo policySnapshot, IndexedTenant it) {
251         if (policySnapshot == null)
252             return null;
253
254         Set<ManagedObject> policyMos = Sets.newHashSet();
255         Set<EgKey> peers = policySnapshot.getPeers(epg);
256
257         if (peers == null || peers.size() <= 0)
258             return null;
259
260         // Allocate an MO for the requested EPG
261         ManagedObject epgMo = new ManagedObject();
262         for (EgKey depg : peers) {
263             /*
264              * Construct the base URI, so that we can
265              * continue adding on to create child MOs.
266              * We use the peer EPG for getting the policy
267              */
268             PolicyUri uri = new PolicyUri();
269             uri.push(MessageUtils.TENANTS_RN);
270             uri.push(MessageUtils.TENANT_RN);
271             uri.push(depg.getTenantId().getValue());
272
273             Policy policy = policySnapshot.getPolicy(epg, depg);
274             if (policy == null || policy == Policy.EMPTY)
275                 continue;
276
277             /*
278              * We now have a policy that we need to send to the agent.
279              * Provide empty condition lists for now - need to be
280              * an actual empty list, instead of null
281              * 
282              * TODO: get actual condition groups
283              */
284             List<ConditionName> conds = new ArrayList<ConditionName>();
285             ConditionGroup cgSrc = policySnapshot.getEgCondGroup(epg, conds);
286             ConditionGroup cgDst = policySnapshot.getEgCondGroup(depg, conds);
287             List<RuleGroup> rgl = policy.getRules(cgSrc, cgDst);
288
289             /*
290              * RuleGroups can refer to the same contract. As result,
291              * we need to keep track of contracts returned and merge
292              * the results into a single ManagedObject
293              */
294             Map<Contract, ManagedObject> contracts = new ConcurrentHashMap<Contract, ManagedObject>();
295
296             for (RuleGroup rg : rgl) {
297
298                 /*
299                  * Construct a new URI for the EPG requested.
300                  * In this case, we want the requested EPG, not
301                  * the peer EPG
302                  */
303                 PolicyUri puri = new PolicyUri();
304                 puri.push(MessageUtils.TENANTS_RN);
305                 puri.push(MessageUtils.TENANT_RN);
306                 puri.push(epg.getTenantId().getValue());
307                 puri.push(MessageUtils.EPG_RN);
308                 puri.push(epg.getEgId().getValue());
309                 Set<ManagedObject> epgMos = MessageUtils.getEndpointGroupMo(epgMo, puri,
310                         it.getEndpointGroup(epg.getEgId()), rg);
311                 if (epgMos != null) {
312                     policyMos.addAll(epgMos);
313                 }
314
315                 Contract c = rg.getRelatedContract();
316                 /*
317                  * This cmol list is used as a container to pass
318                  * an out parameter for the contract MO. This MO
319                  * is returned separately from the others because
320                  * may require merging -- different RuleGroup
321                  * objects can refer to the same contract
322                  */
323                 List<ManagedObject> cmol = new ArrayList<>();
324
325                 uri.push(MessageUtils.CONTRACT_RN);
326                 uri.push(c.getId().getValue());
327                 List<ManagedObject> mol = MessageUtils.getContractAndSubMos(cmol, uri, c, rg, it);
328                 if (mol == null)
329                     continue;
330
331                 // walk back to the tenant for next contract URI
332                 uri.pop();
333                 uri.pop();
334
335                 if (contracts.get(c) != null) {
336                     /*
337                      * Aggregate the child URIs and properties.
338                      */
339                     MessageUtils.mergeMos(contracts.get(c), cmol.remove(0));
340                 } else {
341                     contracts.put(c, cmol.remove(0));
342                 }
343                 policyMos.addAll(mol);
344             }
345             // add in the EPG
346             policyMos.add(epgMo);
347             // add in the contracts
348             policyMos.addAll(contracts.values());
349         }
350         return Lists.newArrayList(policyMos);
351     }
352
353     private void addPolicySubscription(JsonRpcEndpoint endpoint, EgKey epgId) {
354         policyScope.addToScope(epgId.getTenantId(), epgId.getEgId());
355
356         Set<String> agents = epgSubscriptions.get(epgId);
357         if (agents == null) {
358             agents = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
359             Set<String> result = epgSubscriptions.putIfAbsent(epgId, agents);
360             if (result != null) {
361                 agents = result;
362             }
363         }
364         agents.add(endpoint.getIdentifier());
365
366     }
367
368     private void removePolicySubscription(JsonRpcEndpoint endpoint, EgKey epgId) {
369         Set<String> agents = epgSubscriptions.get(epgId);
370         if (agents != null) {
371             agents.remove(endpoint.getIdentifier());
372         }
373         policyScope.removeFromScope(epgId.getTenantId(), epgId.getEgId());
374     }
375
376     @Override
377     public void callback(JsonRpcEndpoint endpoint, RpcMessage request) {
378
379         if (messageMap.get(request.getMethod()) == null) {
380             LOG.warn("message {} was not subscribed to, but was delivered.", request);
381             return;
382         }
383
384         RpcMessage response = null;
385
386         if (request instanceof PolicyResolveRequest) {
387             PolicyResolveRequest req = (PolicyResolveRequest) request;
388             PolicyResolveResponse msg = new PolicyResolveResponse();
389             PolicyResolveResponse.Result result = new PolicyResolveResponse.Result();
390             msg.setId(request.getId());
391
392             if (!req.valid()) {
393                 LOG.warn("Invalid resolve request: {}", req);
394                 OpflexError error = new OpflexError();
395                 error.setCode(OpflexError.ErrorCode.ERROR.toString());
396                 // error.setData(data);
397                 // error.setMessage(message);
398                 // error.setTrace(trace);
399                 msg.setError(error);
400             }
401
402             List<ManagedObject> mol = new ArrayList<>();
403
404             for (PolicyResolveRequest.Params p : req.getParams()) {
405
406                 // Skip this if we already have an error
407                 if (msg.getError() != null)
408                     break;
409
410                 /*
411                  * Only Policy Identity or Policy URI is present.
412                  * Convert Policy Identities to a URI that we can use
413                  */
414                 Uri policyUri = p.getPolicy_uri();
415                 if (policyUri == null) {
416                     Uri rn = p.getPolicy_ident().getContext();
417                     String name = p.getPolicy_ident().getName();
418                     PolicyUri puri = new PolicyUri(rn.getValue());
419                     puri.push(name);
420                     policyUri = puri.getUri();
421                 }
422
423                 // See if the request has an EPG in the URI
424                 if (MessageUtils.hasEpg(policyUri.getValue())) {
425                     /*
426                      * Keep track of EPGs requested by agents.
427                      */
428                     EndpointGroupId egid = new EndpointGroupId(
429                             MessageUtils.getEndpointGroupFromUri(policyUri.getValue()));
430                     TenantId tid = new TenantId(MessageUtils.getTenantFromUri(policyUri.getValue()));
431                     EgKey epgId = new EgKey(tid, egid);
432
433                     addPolicySubscription(endpoint, epgId);
434
435                     IndexedTenant it = policyResolver.getTenant(tid);
436                     if (it != null) {
437                         List<ManagedObject> relatedMos = getPolicy(epgId, policyResolver.getCurrentPolicy(), it);
438                         if (relatedMos != null) {
439                             mol.addAll(relatedMos);
440                         }
441                     }
442                 } else {
443                     OpflexError error = new OpflexError();
444                     error.setMessage(UKNOWN_POLICY);
445                     error.setCode(OpflexError.ErrorCode.EUNSUPPORTED.toString());
446                     // error.setData(data);
447                     // error.setTrace(trace);
448                     msg.setError(error);
449                 }
450
451             }
452             result.setPolicy(mol);
453             msg.setResult(result);
454             response = msg;
455         } else if (request instanceof PolicyUnresolveRequest) {
456             PolicyUnresolveRequest req = (PolicyUnresolveRequest) request;
457             PolicyUnresolveResponse msg = new PolicyUnresolveResponse();
458             msg.setId(request.getId());
459             Uri policyUri;
460
461             if (!req.valid()) {
462                 OpflexError error = new OpflexError();
463                 error.setCode(OpflexError.ErrorCode.ERROR.toString());
464                 // error.setData(data);
465                 // error.setMessage(message);
466                 // error.setTrace(trace);
467                 msg.setError(error);
468             }
469
470             for (PolicyUnresolveRequest.Params p : req.getParams()) {
471
472                 // Skip this if we already have an error
473                 if (msg.getError() != null)
474                     break;
475
476                 /*
477                  * Only Policy Identity or Policy URI is present.
478                  * Convert to a URI that we'll use
479                  */
480                 policyUri = p.getPolicy_uri();
481                 if (policyUri == null) {
482                     // Convert the RN/name to DN
483                     Uri rn = p.getPolicy_ident().getContext();
484                     String name = p.getPolicy_ident().getName();
485                     PolicyUri puri = new PolicyUri(rn.getValue());
486                     puri.push(name);
487                     policyUri = puri.getUri();
488                 }
489
490                 if (MessageUtils.hasEpg(policyUri.getValue())) {
491                     /*
492                      * Keep track of EPGs requested by agents.
493                      */
494                     EndpointGroupId egid = new EndpointGroupId(
495                             MessageUtils.getEndpointGroupFromUri(policyUri.getValue()));
496                     TenantId tid = new TenantId(MessageUtils.getTenantFromUri(policyUri.getValue()));
497                     EgKey epgId = new EgKey(tid, egid);
498
499                     removePolicySubscription(endpoint, epgId);
500                 } else {
501                     OpflexError error = new OpflexError();
502                     error.setMessage(UKNOWN_POLICY);
503                     msg.setError(error);
504                 }
505             }
506             response = msg;
507
508         }
509         if (response != null) {
510             try {
511                 endpoint.sendResponse(response);
512             } catch (Exception e) {
513                 LOG.warn("Error sending response {}", e);
514             }
515         }
516
517     }
518 }