Added range type to subject-feature-definition/parameter
[groupbasedpolicy.git] / groupbasedpolicy / src / main / java / org / opendaylight / groupbasedpolicy / renderer / opflex / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.opflex;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
20
21 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
22 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
23 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
24 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
25 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexAgent;
26 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexConnectionService;
27 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.Role;
28 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.ManagedObject;
29 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.OpflexError;
30 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyResolveRequest;
31 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyResolveResponse;
32 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUnresolveRequest;
33 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUnresolveResponse;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUpdateRequest;
35 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.MitLib;
36 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.PolicyUri;
37 import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
38 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
39 import org.opendaylight.groupbasedpolicy.resolver.IndexedTenant;
40 import org.opendaylight.groupbasedpolicy.resolver.Policy;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
45 import org.opendaylight.groupbasedpolicy.resolver.RuleGroup;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ConditionName;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.Contract;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import com.google.common.collect.Lists;
56 import com.google.common.collect.Sets;
57
58
59 /**
60  * Manage policies on agents by subscribing to updates from the
61  * policy resolver and information about endpoints from the endpoint
62  * registry
63  * @author tbachman
64  */
65 public class PolicyManager
66      implements PolicyListener, RpcBroker.RpcCallback, AutoCloseable  {
67     private static final Logger LOG =
68             LoggerFactory.getLogger(PolicyManager.class);
69
70     private static final String UKNOWN_POLICY = "unknown policy name";
71
72     /*
73      * The tables below are used to look up Managed Objects (MOs)
74      * that have been subscribed to. The table is indexed as
75      * <String:Managed Object DN> <String:agent ID> <Policy:policy>
76      */
77     final PolicyResolver policyResolver;
78     final OpflexConnectionService connectionService;
79     final ScheduledExecutorService executor;
80     private final MitLib mitLibrary;
81     private final PolicyScope policyScope;
82
83     private ConcurrentHashMap<EgKey, Set<String>> epgSubscriptions;
84     private RpcMessageMap messageMap = null;
85
86
87     public PolicyManager(PolicyResolver policyResolver,
88                          OpflexConnectionService connectionService,
89                          ScheduledExecutorService executor,
90                          MitLib mitLibrary) {
91         super();
92         this.executor = executor;
93         this.policyResolver = policyResolver;
94         this.connectionService = connectionService;
95         this.mitLibrary = mitLibrary;
96
97         epgSubscriptions = new ConcurrentHashMap<>();
98
99         /* Subscribe to PR messages */
100         messageMap = new RpcMessageMap();
101         List<RpcMessage> messages = Role.POLICY_REPOSITORY.getMessages();
102         messageMap.addList(messages);
103         for (RpcMessage msg: messages) {
104             this.connectionService.subscribe(msg, this);
105         }
106
107         policyScope = policyResolver.registerListener(this);
108
109         LOG.debug("Initialized OpFlex policy manager");
110     }
111
112     /**
113      * Shut down the {@link PolicyManager}. Implemented from the
114      * AutoCloseable interface.
115      */
116     @Override
117     public void close() throws ExecutionException, InterruptedException {
118
119     }
120
121     // **************
122     // PolicyListener
123     // **************
124
125     @Override
126     public void policyUpdated(Set<EgKey> updatedConsumers) {
127
128         sendPolicyUpdates(updatedConsumers);
129     }
130
131     /**
132      * Set the learning mode to the specified value
133      * @param learningMode the learning mode to set
134      */
135     public void setLearningMode(LearningMode learningMode) {
136         // No-op for now
137     }
138
139     // **************
140     // Implementation
141     // **************
142
143     /**
144      * Update all policy on all agents as needed.  Note that this will block
145      * one of the threads on the executor.
146      * @author tbachman
147      */
148     private void sendPolicyUpdates(Set<EgKey> updatedConsumers) {
149         Map<String, Set<EgKey>> agentMap = new HashMap<String, Set<EgKey>>();
150
151         PolicyInfo info = policyResolver.getCurrentPolicy();
152         if (info == null) return;
153
154         /*
155          * First build a per-agent set of EPGs that need updating
156          */
157         for (EgKey cepg: updatedConsumers) {
158
159                 /*
160                  * Find the set of agents that have subscribed to
161                  * updates for this EPG
162                  */
163             for (String agentId: epgSubscriptions.get(cepg)) {
164                 Set<EgKey> egSet = agentMap.get(agentId);
165                 if (egSet == null) {
166                     egSet =
167                           Collections.
168                           newSetFromMap(new ConcurrentHashMap<EgKey, Boolean>());
169                     agentMap.put(agentId, egSet);
170                 }
171                 egSet.add(cepg);
172             }
173         }
174
175         /*
176          * Go through each agent and provide a single update for all EPGs
177          */
178         for (Map.Entry<String,Set<EgKey>> entry: agentMap.entrySet()) {
179             OpflexAgent agent = connectionService.
180                     getOpflexAgent(entry.getKey());
181             if (agent == null) continue;
182
183             sendPolicyUpdate(agent.getEndpoint(), entry.getValue(), info);
184
185         }
186     }
187
188     /**
189      * This implements Runnable, which allows the {@link ScheduledExecutorservice}
190      * to execute the run() method to implement the update
191      *
192      * @author tbachman
193      *
194      */
195     private class PolicyUpdate implements Runnable {
196         private final JsonRpcEndpoint agent;
197         private final Set<EgKey> epgSet;
198         private final PolicyInfo info;
199
200         PolicyUpdate(JsonRpcEndpoint agent, Set<EgKey> epgSet, PolicyInfo info) {
201             this.agent = agent;
202             this.epgSet = epgSet;
203             this.info = info;
204         }
205
206         @Override
207         public void run() {
208             List<ManagedObject> subtrees =
209                     new ArrayList<ManagedObject>();
210
211             PolicyUpdateRequest request =
212                     new PolicyUpdateRequest();
213             List<PolicyUpdateRequest.Params> paramsList =
214                     new ArrayList<>();
215             PolicyUpdateRequest.Params params =
216                     new PolicyUpdateRequest.Params();
217
218             /*
219              * We may need to optimize this in the future. Currently
220              * we send down the EPG MOs and all the related policy
221              * that's in scope from the PolicyResolver. If we want
222              * to optimize this in the future to only send the policy
223              * objects that changed, we'd either have to change the
224              * PolicyResolver to provide this delta, or we'd have to
225              * keep cached state for each node.
226              */
227             for (EgKey epg: epgSet) {
228                 /*
229                  * Get EPGs from the IndexedTenant, as the EPGs from
230                  * the IndexedTenenat alread has collapsed the EPGs
231                  * (i.e. inheritance accounted for)
232                  *
233                  * TODO: needed?
234                  */
235
236                 IndexedTenant it = policyResolver.getTenant(epg.getTenantId());
237                 List<ManagedObject> relatedMos = getPolicy(epg, info, it);
238                 subtrees.addAll(relatedMos);
239             }
240
241             /*
242              * Currently not using delete URI or merge_children MOs
243              */
244             params.setDelete_uri(new ArrayList<Uri>());
245             params.setMerge_children(new ArrayList<ManagedObject>());
246             params.setReplace(subtrees);
247             paramsList.add(params);
248             request.setParams(paramsList);
249             try {
250                 agent.sendRequest(request);
251             }
252             catch (Throwable t) {
253
254             }
255         }
256     }
257
258     void sendPolicyUpdate(JsonRpcEndpoint agent, Set<EgKey> epgSet, PolicyInfo info) {
259         executor.execute(new PolicyUpdate(agent, epgSet, info));
260     }
261
262
263
264
265     /**
266      * This method creates {@link ManagedObject} POJOs for all of the
267      * policy objects that need to be sent as a result of policy
268      * resolution for the given EPG.
269      *
270      * @param epg The Endpoint Group that was resolved
271      * @param policySnapshot A snapshot of the current resolved policy
272      */
273     private List<ManagedObject> getPolicy(EgKey epg,
274             PolicyInfo policySnapshot, IndexedTenant it) {
275         if (policySnapshot == null) return null;
276
277         Set<ManagedObject> policyMos = Sets.newHashSet();
278         Set<EgKey> peers = policySnapshot.getPeers(epg);
279
280         if (peers == null || peers.size() <= 0) return null;
281
282         // Allocate an MO for the requested EPG
283         ManagedObject epgMo = new ManagedObject();
284         for (EgKey depg: peers) {
285                 /*
286                  * Construct the base URI, so that we can
287                  * continue adding on to create child MOs.
288                  * We use the peer EPG for getting the policy
289                  */
290             PolicyUri uri = new PolicyUri();
291             uri.push(MessageUtils.TENANTS_RN);
292             uri.push(MessageUtils.TENANT_RN);
293             uri.push(depg.getTenantId().getValue());
294
295             Policy policy = policySnapshot.getPolicy(epg, depg);
296             if (policy == null || policy == Policy.EMPTY) continue;
297
298             /*
299              * We now have a policy that we need to send to the agent.
300              * Provide empty condition lists for now - need to be
301              * an actual empty list, instead of null
302              *
303              * TODO: get actual condition groups
304              */
305             List<ConditionName> conds = new ArrayList<ConditionName>();
306             ConditionGroup cgSrc = policySnapshot.getEgCondGroup(epg, conds);
307             ConditionGroup cgDst = policySnapshot.getEgCondGroup(depg, conds);
308             List<RuleGroup> rgl = policy.getRules(cgSrc, cgDst);
309
310             /*
311              * RuleGroups can refer to the same contract. As result,
312              * we need to keep track of contracts returned and merge
313              * the results into a single ManagedObject
314              */
315             Map<Contract, ManagedObject> contracts =
316                     new ConcurrentHashMap<Contract, ManagedObject>();
317
318             for (RuleGroup rg: rgl) {
319
320                 /*
321                  * Construct a new URI for the EPG requested.
322                  * In this case, we want the requested EPG, not
323                  * the peer EPG
324                  */
325                 PolicyUri puri = new PolicyUri();
326                 puri.push(MessageUtils.TENANTS_RN);
327                 puri.push(MessageUtils.TENANT_RN);
328                 puri.push(epg.getTenantId().getValue());
329                 puri.push(MessageUtils.EPG_RN);
330                 puri.push(epg.getEgId().getValue());
331                 Set<ManagedObject> epgMos = MessageUtils.
332                             getEndpointGroupMo(epgMo,
333                                                puri,
334                                                it.getEndpointGroup(epg.getEgId()),
335                                                rg);
336                 if (epgMos != null) {
337                     policyMos.addAll(epgMos);
338                 }
339
340
341                 Contract c = rg.getRelatedContract();
342                 /*
343                  * This cmol list is used as a container to pass
344                  * an out parameter for the contract MO. This MO
345                  * is returned separately from the others because
346                  * may require merging -- different RuleGroup
347                  * objects can refer to the same contract
348                  */
349                 List<ManagedObject> cmol = new ArrayList<ManagedObject>();
350                 List<ManagedObject> mol = null;
351
352                 uri.push(MessageUtils.CONTRACT_RN);
353                 uri.push(c.getId().getValue());
354                 mol = MessageUtils.getContractAndSubMos(cmol, uri, c, rg, it);
355                 if (mol == null) continue;
356
357                 // walk back to the tenant for next contract URI
358                 uri.pop(); uri.pop();
359
360                 if(contracts.get(c) != null) {
361                     /*
362                      * Aggregate the child URIs and properties.
363                      */
364                         MessageUtils.mergeMos(contracts.get(c), cmol.remove(0));
365                 }
366                 else {
367                     contracts.put(c, cmol.remove(0));
368                 }
369                 policyMos.addAll(mol);
370             }
371             // add in the EPG
372             policyMos.add(epgMo);
373             // add in the contracts
374             policyMos.addAll(contracts.values());
375         }
376         return Lists.newArrayList(policyMos);
377     }
378
379     private void addPolicySubscription(JsonRpcEndpoint endpoint, EgKey epgId) {
380         policyScope.addToScope(epgId.getTenantId(), epgId.getEgId());
381
382         Set<String> agents = epgSubscriptions.get(epgId);
383         if (agents == null) {
384             agents = Collections.
385                     newSetFromMap(new ConcurrentHashMap<String, Boolean>());
386             Set<String> result = epgSubscriptions.putIfAbsent(epgId, agents);
387             if (result != null) {
388                 agents = result;
389             }
390         }
391         agents.add(endpoint.getIdentifier());
392
393     }
394
395     private void removePolicySubscription(JsonRpcEndpoint endpoint, EgKey epgId) {
396         Set<String> agents = epgSubscriptions.get(epgId);
397         if (agents != null) {
398             agents.remove(endpoint.getIdentifier());
399         }
400         policyScope.removeFromScope(epgId.getTenantId(), epgId.getEgId());
401     }
402
403     @Override
404     public void callback(JsonRpcEndpoint endpoint, RpcMessage request) {
405
406         if (messageMap.get(request.getMethod()) == null) {
407             LOG.warn("message {} was not subscribed to, but was delivered.", request);
408             return;
409         }
410
411         RpcMessage response = null;
412
413         if (request instanceof PolicyResolveRequest) {
414             PolicyResolveRequest req = (PolicyResolveRequest)request;
415             PolicyResolveResponse msg = new PolicyResolveResponse();
416             PolicyResolveResponse.Result result = new PolicyResolveResponse.Result();
417             msg.setId(request.getId());
418
419             if (!req.valid()) {
420                 LOG.warn("Invalid resolve request: {}", req);
421                 OpflexError error = new OpflexError();
422                 error.setCode(OpflexError.ErrorCode.ERROR.toString());
423                 //error.setData(data);
424                 //error.setMessage(message);
425                 //error.setTrace(trace);
426                 msg.setError(error);
427             }
428
429             Uri policyUri = null;
430                 List<ManagedObject> mol = new ArrayList<ManagedObject>();
431
432             for (PolicyResolveRequest.Params p: req.getParams()) {
433
434                 // Skip this if we already have an error
435                 if (msg.getError() != null) break;
436
437                 /*
438                  * Only Policy Identity or Policy URI is present.
439                  * Convert Policy Identities to a URI that we can use
440                  */
441                 policyUri = p.getPolicy_uri();
442                 if (policyUri == null) {
443                         Uri rn = p.getPolicy_ident().getContext();
444                         String name = p.getPolicy_ident().getName();
445                         PolicyUri puri = new PolicyUri(rn.getValue());
446                         puri.push(name);
447                         policyUri = puri.getUri();
448                 }
449
450                 // See if the request has an EPG in the URI
451                 if (MessageUtils.hasEpg(policyUri.getValue())) {
452                     /*
453                      * Keep track of EPGs requested by agents.
454                      */
455                     EndpointGroupId egid = new EndpointGroupId(MessageUtils.
456                             getEndpointGroupFromUri(policyUri.getValue()));
457                     TenantId tid = new TenantId(MessageUtils.
458                             getTenantFromUri(policyUri.getValue()));
459                     EgKey epgId = new EgKey(tid, egid);
460
461                     addPolicySubscription(endpoint, epgId);
462
463                     IndexedTenant it = policyResolver.getTenant(tid);
464                     if (it != null) {
465                         List<ManagedObject> relatedMos =
466                                 getPolicy(epgId, policyResolver.getCurrentPolicy(), it);
467                         if (relatedMos != null) {
468                             mol.addAll(relatedMos);
469                         }
470                     }
471                 }
472                 else {
473                     OpflexError error =
474                             new OpflexError();
475                     error.setMessage(UKNOWN_POLICY);
476                     error.setCode(OpflexError.ErrorCode.EUNSUPPORTED.toString());
477                     //error.setData(data);
478                     //error.setTrace(trace);
479                     msg.setError(error);
480                 }
481
482             }
483             result.setPolicy(mol);
484             msg.setResult(result);
485             response = msg;
486         }
487         else if (request instanceof PolicyUnresolveRequest) {
488             PolicyUnresolveRequest req = (PolicyUnresolveRequest)request;
489             PolicyUnresolveResponse msg = new PolicyUnresolveResponse();
490             msg.setId(request.getId());
491             Uri policyUri;
492
493             if (!req.valid()) {
494                 OpflexError error = new OpflexError();
495                 error.setCode(OpflexError.ErrorCode.ERROR.toString());
496                 //error.setData(data);
497                 //error.setMessage(message);
498                 //error.setTrace(trace);
499                 msg.setError(error);
500             }
501
502             for (PolicyUnresolveRequest.Params p: req.getParams()) {
503
504                 // Skip this if we already have an error
505                 if (msg.getError() != null) break;
506
507                 /*
508                  * Only Policy Identity or Policy URI is present.
509                  * Convert to a URI that we'll use
510                  */
511                 policyUri = p.getPolicy_uri();
512                 if (policyUri == null) {
513                         // Convert the RN/name to DN
514                         Uri rn = p.getPolicy_ident().getContext();
515                         String name = p.getPolicy_ident().getName();
516                         PolicyUri puri = new PolicyUri(rn.getValue());
517                         puri.push(name);
518                         policyUri = puri.getUri();
519                 }
520
521                 if (MessageUtils.hasEpg(policyUri.getValue())) {
522                     /*
523                      * Keep track of EPGs requested by agents.
524                      */
525                     EndpointGroupId egid = new EndpointGroupId(MessageUtils.
526                             getEndpointGroupFromUri(policyUri.getValue()));
527                     TenantId tid = new TenantId(MessageUtils.
528                             getTenantFromUri(policyUri.getValue()));
529                     EgKey epgId = new EgKey(tid, egid);
530
531                     removePolicySubscription(endpoint, epgId);
532                 }
533                 else {
534                     OpflexError error =
535                             new OpflexError();
536                     error.setMessage(UKNOWN_POLICY);
537                     msg.setError(error);
538                 }
539             }
540             response = msg;
541
542         }
543         if (response != null) {
544                 try {
545                     endpoint.sendResponse(response);
546                 }
547                 catch (Throwable t) {
548                 LOG.warn("Error sending response {}", t);
549                 }
550         }
551
552     }
553 }