Separating renderers into features.
[groupbasedpolicy.git] / renderers / opflex / src / main / java / org / opendaylight / groupbasedpolicy / renderer / opflex / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.opflex;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ScheduledExecutorService;
20
21 import org.opendaylight.groupbasedpolicy.jsonrpc.JsonRpcEndpoint;
22 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcBroker;
23 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessage;
24 import org.opendaylight.groupbasedpolicy.jsonrpc.RpcMessageMap;
25 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexAgent;
26 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexConnectionService;
27 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.Role;
28 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.ManagedObject;
29 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.OpflexError;
30 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyResolveRequest;
31 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyResolveResponse;
32 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUnresolveRequest;
33 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUnresolveResponse;
34 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.PolicyUpdateRequest;
35 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.MitLib;
36 import org.opendaylight.groupbasedpolicy.renderer.opflex.mit.PolicyUri;
37 import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
38 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
39 import org.opendaylight.groupbasedpolicy.resolver.IndexedTenant;
40 import org.opendaylight.groupbasedpolicy.resolver.Policy;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
45 import org.opendaylight.groupbasedpolicy.resolver.RuleGroup;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Uri;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ConditionName;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.EndpointGroupId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.Contract;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.collect.Lists;
55 import com.google.common.collect.Sets;
56
57
58 /**
59  * Manage policies on agents by subscribing to updates from the
60  * policy resolver and information about endpoints from the endpoint
61  * registry
62  * @author tbachman
63  */
64 public class PolicyManager
65      implements PolicyListener, RpcBroker.RpcCallback, AutoCloseable  {
66     private static final Logger LOG =
67             LoggerFactory.getLogger(PolicyManager.class);
68
69     private static final String UKNOWN_POLICY = "unknown policy name";
70
71     /*
72      * The tables below are used to look up Managed Objects (MOs)
73      * that have been subscribed to. The table is indexed as
74      * <String:Managed Object DN> <String:agent ID> <Policy:policy>
75      */
76     final PolicyResolver policyResolver;
77     final OpflexConnectionService connectionService;
78     final ScheduledExecutorService executor;
79     private final MitLib mitLibrary;
80     private final PolicyScope policyScope;
81
82     private ConcurrentHashMap<EgKey, Set<String>> epgSubscriptions;
83     private RpcMessageMap messageMap = null;
84
85
86     public PolicyManager(PolicyResolver policyResolver,
87                          OpflexConnectionService connectionService,
88                          ScheduledExecutorService executor,
89                          MitLib mitLibrary) {
90         super();
91         this.executor = executor;
92         this.policyResolver = policyResolver;
93         this.connectionService = connectionService;
94         this.mitLibrary = mitLibrary;
95
96         epgSubscriptions = new ConcurrentHashMap<>();
97
98         /* Subscribe to PR messages */
99         messageMap = new RpcMessageMap();
100         List<RpcMessage> messages = Role.POLICY_REPOSITORY.getMessages();
101         messageMap.addList(messages);
102         for (RpcMessage msg: messages) {
103             this.connectionService.subscribe(msg, this);
104         }
105
106         policyScope = policyResolver.registerListener(this);
107
108         LOG.debug("Initialized OpFlex policy manager");
109     }
110
111     /**
112      * Shut down the {@link PolicyManager}. Implemented from the
113      * AutoCloseable interface.
114      */
115     @Override
116     public void close() throws ExecutionException, InterruptedException {
117
118     }
119
120     // **************
121     // PolicyListener
122     // **************
123
124     @Override
125     public void policyUpdated(Set<EgKey> updatedConsumers) {
126
127         sendPolicyUpdates(updatedConsumers);
128     }
129
130     // **************
131     // Implementation
132     // **************
133
134     /**
135      * Update all policy on all agents as needed.  Note that this will block
136      * one of the threads on the executor.
137      * @author tbachman
138      */
139     private void sendPolicyUpdates(Set<EgKey> updatedConsumers) {
140         Map<String, Set<EgKey>> agentMap = new HashMap<String, Set<EgKey>>();
141
142         PolicyInfo info = policyResolver.getCurrentPolicy();
143         if (info == null) return;
144
145         /*
146          * First build a per-agent set of EPGs that need updating
147          */
148         for (EgKey cepg: updatedConsumers) {
149
150                 /*
151                  * Find the set of agents that have subscribed to
152                  * updates for this EPG
153                  */
154             for (String agentId: epgSubscriptions.get(cepg)) {
155                 Set<EgKey> egSet = agentMap.get(agentId);
156                 if (egSet == null) {
157                     egSet =
158                           Collections.
159                           newSetFromMap(new ConcurrentHashMap<EgKey, Boolean>());
160                     agentMap.put(agentId, egSet);
161                 }
162                 egSet.add(cepg);
163             }
164         }
165
166         /*
167          * Go through each agent and provide a single update for all EPGs
168          */
169         for (Map.Entry<String,Set<EgKey>> entry: agentMap.entrySet()) {
170             OpflexAgent agent = connectionService.
171                     getOpflexAgent(entry.getKey());
172             if (agent == null) continue;
173
174             sendPolicyUpdate(agent.getEndpoint(), entry.getValue(), info);
175
176         }
177     }
178
179     /**
180      * This implements Runnable, which allows the {@link ScheduledExecutorservice}
181      * to execute the run() method to implement the update
182      *
183      * @author tbachman
184      *
185      */
186     private class PolicyUpdate implements Runnable {
187         private final JsonRpcEndpoint agent;
188         private final Set<EgKey> epgSet;
189         private final PolicyInfo info;
190
191         PolicyUpdate(JsonRpcEndpoint agent, Set<EgKey> epgSet, PolicyInfo info) {
192             this.agent = agent;
193             this.epgSet = epgSet;
194             this.info = info;
195         }
196
197         @Override
198         public void run() {
199             List<ManagedObject> subtrees =
200                     new ArrayList<ManagedObject>();
201
202             PolicyUpdateRequest request =
203                     new PolicyUpdateRequest();
204             List<PolicyUpdateRequest.Params> paramsList =
205                     new ArrayList<>();
206             PolicyUpdateRequest.Params params =
207                     new PolicyUpdateRequest.Params();
208
209             /*
210              * We may need to optimize this in the future. Currently
211              * we send down the EPG MOs and all the related policy
212              * that's in scope from the PolicyResolver. If we want
213              * to optimize this in the future to only send the policy
214              * objects that changed, we'd either have to change the
215              * PolicyResolver to provide this delta, or we'd have to
216              * keep cached state for each node.
217              */
218             for (EgKey epg: epgSet) {
219                 /*
220                  * Get EPGs from the IndexedTenant, as the EPGs from
221                  * the IndexedTenenat alread has collapsed the EPGs
222                  * (i.e. inheritance accounted for)
223                  *
224                  * TODO: needed?
225                  */
226
227                 IndexedTenant it = policyResolver.getTenant(epg.getTenantId());
228                 List<ManagedObject> relatedMos = getPolicy(epg, info, it);
229                 subtrees.addAll(relatedMos);
230             }
231
232             /*
233              * Currently not using delete URI or merge_children MOs
234              */
235             params.setDelete_uri(new ArrayList<Uri>());
236             params.setMerge_children(new ArrayList<ManagedObject>());
237             params.setReplace(subtrees);
238             paramsList.add(params);
239             request.setParams(paramsList);
240             try {
241                 agent.sendRequest(request);
242             }
243             catch (Throwable t) {
244
245             }
246         }
247     }
248
249     void sendPolicyUpdate(JsonRpcEndpoint agent, Set<EgKey> epgSet, PolicyInfo info) {
250         executor.execute(new PolicyUpdate(agent, epgSet, info));
251     }
252
253
254
255
256     /**
257      * This method creates {@link ManagedObject} POJOs for all of the
258      * policy objects that need to be sent as a result of policy
259      * resolution for the given EPG.
260      *
261      * @param epg The Endpoint Group that was resolved
262      * @param policySnapshot A snapshot of the current resolved policy
263      */
264     private List<ManagedObject> getPolicy(EgKey epg,
265             PolicyInfo policySnapshot, IndexedTenant it) {
266         if (policySnapshot == null) return null;
267
268         Set<ManagedObject> policyMos = Sets.newHashSet();
269         Set<EgKey> peers = policySnapshot.getPeers(epg);
270
271         if (peers == null || peers.size() <= 0) return null;
272
273         // Allocate an MO for the requested EPG
274         ManagedObject epgMo = new ManagedObject();
275         for (EgKey depg: peers) {
276                 /*
277                  * Construct the base URI, so that we can
278                  * continue adding on to create child MOs.
279                  * We use the peer EPG for getting the policy
280                  */
281             PolicyUri uri = new PolicyUri();
282             uri.push(MessageUtils.TENANTS_RN);
283             uri.push(MessageUtils.TENANT_RN);
284             uri.push(depg.getTenantId().getValue());
285
286             Policy policy = policySnapshot.getPolicy(epg, depg);
287             if (policy == null || policy == Policy.EMPTY) continue;
288
289             /*
290              * We now have a policy that we need to send to the agent.
291              * Provide empty condition lists for now - need to be
292              * an actual empty list, instead of null
293              *
294              * TODO: get actual condition groups
295              */
296             List<ConditionName> conds = new ArrayList<ConditionName>();
297             ConditionGroup cgSrc = policySnapshot.getEgCondGroup(epg, conds);
298             ConditionGroup cgDst = policySnapshot.getEgCondGroup(depg, conds);
299             List<RuleGroup> rgl = policy.getRules(cgSrc, cgDst);
300
301             /*
302              * RuleGroups can refer to the same contract. As result,
303              * we need to keep track of contracts returned and merge
304              * the results into a single ManagedObject
305              */
306             Map<Contract, ManagedObject> contracts =
307                     new ConcurrentHashMap<Contract, ManagedObject>();
308
309             for (RuleGroup rg: rgl) {
310
311                 /*
312                  * Construct a new URI for the EPG requested.
313                  * In this case, we want the requested EPG, not
314                  * the peer EPG
315                  */
316                 PolicyUri puri = new PolicyUri();
317                 puri.push(MessageUtils.TENANTS_RN);
318                 puri.push(MessageUtils.TENANT_RN);
319                 puri.push(epg.getTenantId().getValue());
320                 puri.push(MessageUtils.EPG_RN);
321                 puri.push(epg.getEgId().getValue());
322                 Set<ManagedObject> epgMos = MessageUtils.
323                             getEndpointGroupMo(epgMo,
324                                                puri,
325                                                it.getEndpointGroup(epg.getEgId()),
326                                                rg);
327                 if (epgMos != null) {
328                     policyMos.addAll(epgMos);
329                 }
330
331
332                 Contract c = rg.getRelatedContract();
333                 /*
334                  * This cmol list is used as a container to pass
335                  * an out parameter for the contract MO. This MO
336                  * is returned separately from the others because
337                  * may require merging -- different RuleGroup
338                  * objects can refer to the same contract
339                  */
340                 List<ManagedObject> cmol = new ArrayList<ManagedObject>();
341                 List<ManagedObject> mol = null;
342
343                 uri.push(MessageUtils.CONTRACT_RN);
344                 uri.push(c.getId().getValue());
345                 mol = MessageUtils.getContractAndSubMos(cmol, uri, c, rg, it);
346                 if (mol == null) continue;
347
348                 // walk back to the tenant for next contract URI
349                 uri.pop(); uri.pop();
350
351                 if(contracts.get(c) != null) {
352                     /*
353                      * Aggregate the child URIs and properties.
354                      */
355                         MessageUtils.mergeMos(contracts.get(c), cmol.remove(0));
356                 }
357                 else {
358                     contracts.put(c, cmol.remove(0));
359                 }
360                 policyMos.addAll(mol);
361             }
362             // add in the EPG
363             policyMos.add(epgMo);
364             // add in the contracts
365             policyMos.addAll(contracts.values());
366         }
367         return Lists.newArrayList(policyMos);
368     }
369
370     private void addPolicySubscription(JsonRpcEndpoint endpoint, EgKey epgId) {
371         policyScope.addToScope(epgId.getTenantId(), epgId.getEgId());
372
373         Set<String> agents = epgSubscriptions.get(epgId);
374         if (agents == null) {
375             agents = Collections.
376                     newSetFromMap(new ConcurrentHashMap<String, Boolean>());
377             Set<String> result = epgSubscriptions.putIfAbsent(epgId, agents);
378             if (result != null) {
379                 agents = result;
380             }
381         }
382         agents.add(endpoint.getIdentifier());
383
384     }
385
386     private void removePolicySubscription(JsonRpcEndpoint endpoint, EgKey epgId) {
387         Set<String> agents = epgSubscriptions.get(epgId);
388         if (agents != null) {
389             agents.remove(endpoint.getIdentifier());
390         }
391         policyScope.removeFromScope(epgId.getTenantId(), epgId.getEgId());
392     }
393
394     @Override
395     public void callback(JsonRpcEndpoint endpoint, RpcMessage request) {
396
397         if (messageMap.get(request.getMethod()) == null) {
398             LOG.warn("message {} was not subscribed to, but was delivered.", request);
399             return;
400         }
401
402         RpcMessage response = null;
403
404         if (request instanceof PolicyResolveRequest) {
405             PolicyResolveRequest req = (PolicyResolveRequest)request;
406             PolicyResolveResponse msg = new PolicyResolveResponse();
407             PolicyResolveResponse.Result result = new PolicyResolveResponse.Result();
408             msg.setId(request.getId());
409
410             if (!req.valid()) {
411                 LOG.warn("Invalid resolve request: {}", req);
412                 OpflexError error = new OpflexError();
413                 error.setCode(OpflexError.ErrorCode.ERROR.toString());
414                 //error.setData(data);
415                 //error.setMessage(message);
416                 //error.setTrace(trace);
417                 msg.setError(error);
418             }
419
420             Uri policyUri = null;
421                 List<ManagedObject> mol = new ArrayList<ManagedObject>();
422
423             for (PolicyResolveRequest.Params p: req.getParams()) {
424
425                 // Skip this if we already have an error
426                 if (msg.getError() != null) break;
427
428                 /*
429                  * Only Policy Identity or Policy URI is present.
430                  * Convert Policy Identities to a URI that we can use
431                  */
432                 policyUri = p.getPolicy_uri();
433                 if (policyUri == null) {
434                         Uri rn = p.getPolicy_ident().getContext();
435                         String name = p.getPolicy_ident().getName();
436                         PolicyUri puri = new PolicyUri(rn.getValue());
437                         puri.push(name);
438                         policyUri = puri.getUri();
439                 }
440
441                 // See if the request has an EPG in the URI
442                 if (MessageUtils.hasEpg(policyUri.getValue())) {
443                     /*
444                      * Keep track of EPGs requested by agents.
445                      */
446                     EndpointGroupId egid = new EndpointGroupId(MessageUtils.
447                             getEndpointGroupFromUri(policyUri.getValue()));
448                     TenantId tid = new TenantId(MessageUtils.
449                             getTenantFromUri(policyUri.getValue()));
450                     EgKey epgId = new EgKey(tid, egid);
451
452                     addPolicySubscription(endpoint, epgId);
453
454                     IndexedTenant it = policyResolver.getTenant(tid);
455                     if (it != null) {
456                         List<ManagedObject> relatedMos =
457                                 getPolicy(epgId, policyResolver.getCurrentPolicy(), it);
458                         if (relatedMos != null) {
459                             mol.addAll(relatedMos);
460                         }
461                     }
462                 }
463                 else {
464                     OpflexError error =
465                             new OpflexError();
466                     error.setMessage(UKNOWN_POLICY);
467                     error.setCode(OpflexError.ErrorCode.EUNSUPPORTED.toString());
468                     //error.setData(data);
469                     //error.setTrace(trace);
470                     msg.setError(error);
471                 }
472
473             }
474             result.setPolicy(mol);
475             msg.setResult(result);
476             response = msg;
477         }
478         else if (request instanceof PolicyUnresolveRequest) {
479             PolicyUnresolveRequest req = (PolicyUnresolveRequest)request;
480             PolicyUnresolveResponse msg = new PolicyUnresolveResponse();
481             msg.setId(request.getId());
482             Uri policyUri;
483
484             if (!req.valid()) {
485                 OpflexError error = new OpflexError();
486                 error.setCode(OpflexError.ErrorCode.ERROR.toString());
487                 //error.setData(data);
488                 //error.setMessage(message);
489                 //error.setTrace(trace);
490                 msg.setError(error);
491             }
492
493             for (PolicyUnresolveRequest.Params p: req.getParams()) {
494
495                 // Skip this if we already have an error
496                 if (msg.getError() != null) break;
497
498                 /*
499                  * Only Policy Identity or Policy URI is present.
500                  * Convert to a URI that we'll use
501                  */
502                 policyUri = p.getPolicy_uri();
503                 if (policyUri == null) {
504                         // Convert the RN/name to DN
505                         Uri rn = p.getPolicy_ident().getContext();
506                         String name = p.getPolicy_ident().getName();
507                         PolicyUri puri = new PolicyUri(rn.getValue());
508                         puri.push(name);
509                         policyUri = puri.getUri();
510                 }
511
512                 if (MessageUtils.hasEpg(policyUri.getValue())) {
513                     /*
514                      * Keep track of EPGs requested by agents.
515                      */
516                     EndpointGroupId egid = new EndpointGroupId(MessageUtils.
517                             getEndpointGroupFromUri(policyUri.getValue()));
518                     TenantId tid = new TenantId(MessageUtils.
519                             getTenantFromUri(policyUri.getValue()));
520                     EgKey epgId = new EgKey(tid, egid);
521
522                     removePolicySubscription(endpoint, epgId);
523                 }
524                 else {
525                     OpflexError error =
526                             new OpflexError();
527                     error.setMessage(UKNOWN_POLICY);
528                     msg.setError(error);
529                 }
530             }
531             response = msg;
532
533         }
534         if (response != null) {
535                 try {
536                     endpoint.sendResponse(response);
537                 }
538                 catch (Throwable t) {
539                 LOG.warn("Error sending response {}", t);
540                 }
541         }
542
543     }
544 }