Simplifying validators
[groupbasedpolicy.git] / groupbasedpolicy / src / main / java / org / opendaylight / groupbasedpolicy / resolver / PolicyResolver.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.groupbasedpolicy.resolver;
9
10 import java.util.Collection;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.CopyOnWriteArrayList;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.atomic.AtomicReference;
21
22 import javax.annotation.concurrent.Immutable;
23
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.groupbasedpolicy.resolver.validator.ValidationResult;
33 import org.opendaylight.groupbasedpolicy.resolver.validator.Validator;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ClassifierDefinitionId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.Tenant;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPolicies;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPoliciesBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.SubjectFeatureInstances;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.subject.feature.instances.ActionInstance;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.subject.feature.instances.ClassifierInstance;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.binding.DataObject;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import com.google.common.base.Optional;
50 import com.google.common.base.Predicate;
51 import com.google.common.collect.HashMultimap;
52 import com.google.common.collect.Multimaps;
53 import com.google.common.collect.SetMultimap;
54 import com.google.common.collect.Sets;
55 import com.google.common.collect.Table;
56 import com.google.common.collect.Table.Cell;
57 import com.google.common.util.concurrent.FutureCallback;
58 import com.google.common.util.concurrent.Futures;
59 import com.google.common.util.concurrent.ListenableFuture;
60
61 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
62
63 /**
64  * The policy resolver is a utility for renderers to help in resolving
65  * group-based policy into a form that is easier to apply to the actual network.
66  *
67  * For any pair of endpoint groups, there is a set of rules that could apply to
68  * the endpoints on that group based on the policy configuration. The exact list
69  * of rules that apply to a given pair of endpoints depends on the conditions
70  * that are active on the endpoints.
71  *
72  * We need to be able to query against this policy model, enumerate the relevant
73  * classes of traffic and endpoints, and notify renderers when there are changes
74  * to policy as it applies to active sets of endpoints and endpoint groups.
75  *
76  * The policy resolver will maintain the necessary state for all tenants in its
77  * control domain, which is the set of tenants for which policy listeners have
78  * been registered.
79  *
80  */
81 public class PolicyResolver implements AutoCloseable {
82
83     private static final Logger LOG = LoggerFactory.getLogger(PolicyResolver.class);
84
85     private final DataBroker dataProvider;
86     private final ScheduledExecutorService executor;
87
88     /**
89      * Keep track of the current relevant policy scopes.
90      */
91     protected CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
92
93     protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
94
95     /**
96      * Store a policy object for each endpoint group pair. The table is stored
97      * with the key as (consumer, provider). Two endpoints could appear in both
98      * roles at the same time, in which case both policies would apply.
99      */
100     AtomicReference<PolicyInfo> policy = new AtomicReference<>();
101
102     /*
103      * Store validators for ActionDefinitions from Renderers
104      *
105      */
106     protected SetMultimap<ActionDefinitionId, Validator<ActionInstance>> actionInstanceValidatorsByDefinition = Multimaps.synchronizedSetMultimap(HashMultimap.<ActionDefinitionId, Validator<ActionInstance>>create());
107     protected SetMultimap<ClassifierDefinitionId, Validator<ClassifierInstance>> classifierInstanceValidatorsByDefinition = Multimaps.synchronizedSetMultimap(HashMultimap.<ClassifierDefinitionId, Validator<ClassifierInstance>>create());
108
109     public PolicyResolver(DataBroker dataProvider,
110             ScheduledExecutorService executor) {
111         super();
112         this.dataProvider = dataProvider;
113         this.executor = executor;
114         policyListenerScopes = new CopyOnWriteArrayList<>();
115         resolvedTenants = new ConcurrentHashMap<>();
116         LOG.debug("Initialized renderer common policy resolver");
117     }
118
119     // *************
120     // AutoCloseable
121     // *************
122     @Override
123     public void close() throws Exception {
124         for (TenantContext ctx : resolvedTenants.values()) {
125             if (ctx.registration != null) {
126                 ctx.registration.close();
127             }
128         }
129     }
130
131     // *************************
132     // PolicyResolver public API
133     // *************************
134     /**
135      * Get a snapshot of the current policy
136      *
137      * @return the {@link PolicyInfo} object representing an immutable snapshot
138      * of the policy state
139      */
140     public PolicyInfo getCurrentPolicy() {
141         return policy.get();
142     }
143
144     /**
145      * Get the normalized tenant for the given ID
146      *
147      * @param tenant the tenant ID
148      * @return the {@link Tenant}
149      */
150     public IndexedTenant getTenant(TenantId tenant) {
151         TenantContext tc = resolvedTenants.get(tenant);
152         if (tc == null) {
153             return null;
154         }
155         return tc.tenant.get();
156     }
157
158     public void registerActionInstanceValidators(ActionDefinitionId actionDefinitionId,
159             Validator<ActionInstance> validator) {
160         actionInstanceValidatorsByDefinition.put(actionDefinitionId, validator);
161     }
162
163     public void unregisterActionInstanceValidators(ActionDefinitionId actionDefinitionId,
164             Validator<ActionInstance> validator) {
165         actionInstanceValidatorsByDefinition.remove(actionDefinitionId, validator);
166     }
167
168     public void registerClassifierInstanceValidators(ClassifierDefinitionId classifierDefinitionId,
169             Validator<ClassifierInstance> validator) {
170         classifierInstanceValidatorsByDefinition.put(classifierDefinitionId, validator);
171     }
172
173     public void unregisterClassifierInstanceValidators(ClassifierDefinitionId classifierDefinitionId,
174             Validator<ClassifierInstance> validator) {
175         classifierInstanceValidatorsByDefinition.remove(classifierDefinitionId, validator);
176     }
177
178     /**
179      * Register a listener to receive update events.
180      *
181      * @param listener the {@link PolicyListener} object to receive the update
182      * events
183      */
184     public PolicyScope registerListener(PolicyListener listener) {
185         PolicyScope ps = new PolicyScope(this, listener);
186         policyListenerScopes.add(ps);
187
188         return ps;
189     }
190
191     /**
192      * Remove the listener registered for the given {@link PolicyScope}.
193      *
194      * @param scope the scope to remove
195      * @see PolicyResolver#registerListener(PolicyListener)
196      */
197     public void removeListener(PolicyScope scope) {
198         policyListenerScopes.remove(scope);
199     }
200
201     // **************
202     // Implementation
203     // **************
204     /**
205      * Atomically update the active policy and notify policy listeners of
206      * relevant changes
207      *
208      * @param policyMap the new policy to set
209      * @param egConditions the map of endpoint groups to relevant condition sets
210      * @return the set of groups with updated policy
211      */
212     protected Set<EgKey> updatePolicy(Table<EgKey, EgKey, Policy> policyMap,
213             Map<EgKey, Set<ConditionSet>> egConditions,
214             List<PolicyScope> policyListenerScopes) {
215         PolicyInfo newPolicy = new PolicyInfo(policyMap, egConditions);
216         PolicyInfo oldPolicy = policy.getAndSet(newPolicy);
217
218         HashSet<EgKey> notifySet = new HashSet<>();
219
220         for (Cell<EgKey, EgKey, Policy> cell : newPolicy.getPolicyMap().cellSet()) {
221             Policy newp = cell.getValue();
222             Policy oldp = null;
223             if (oldPolicy != null) {
224                 oldp = oldPolicy.getPolicyMap().get(cell.getRowKey(),
225                         cell.getColumnKey());
226             }
227             if (oldp == null || !newp.equals(oldp)) {
228                 notifySet.add(cell.getRowKey());
229                 notifySet.add(cell.getColumnKey());
230             }
231         }
232         if (oldPolicy != null) {
233             for (Cell<EgKey, EgKey, Policy> cell : oldPolicy.getPolicyMap().cellSet()) {
234                 if (!newPolicy.getPolicyMap().contains(cell.getRowKey(),
235                         cell.getColumnKey())) {
236                     notifySet.add(cell.getRowKey());
237                     notifySet.add(cell.getColumnKey());
238                 }
239             }
240         }
241         return notifySet;
242     }
243
244     /**
245      * Notify the policy listeners about a set of updated groups
246      */
247     private void notifyListeners(Set<EgKey> updatedGroups) {
248         for (final PolicyScope scope : policyListenerScopes) {
249             Set<EgKey> filtered
250                     = Sets.filter(updatedGroups, new Predicate<EgKey>() {
251                         @Override
252                         public boolean apply(EgKey input) {
253                             return scope.contains(input.getTenantId(),
254                                     input.getEgId());
255                         }
256                     });
257             if (!filtered.isEmpty()) {
258                 scope.getListener().policyUpdated(filtered);
259             }
260         }
261     }
262
263     /**
264      * Subscribe the resolver to updates related to a particular tenant Make
265      * sure that this can't be called concurrently with subscribe
266      *
267      * @param tenantId the tenant ID to subscribe to
268      */
269     protected void subscribeTenant(TenantId tenantId) {
270         if (!resolvedTenants.containsKey(tenantId)) {
271             updateTenant(tenantId);
272         }
273     }
274
275     /**
276      * Unsubscribe the resolver from updates related to a particular tenant Make
277      * sure that this can't be called concurrently with subscribe
278      *
279      * @param tenantId the tenant ID to subscribe to
280      */
281     protected void unsubscribeTenant(TenantId tenantId) {
282         TenantContext context = resolvedTenants.get(tenantId);
283         if (context != null) {
284             resolvedTenants.remove(tenantId);
285             context.registration.close();
286         }
287     }
288
289     private void updateTenant(final TenantId tenantId) {
290         if (dataProvider == null) {
291             return;
292         }
293
294         TenantContext context = resolvedTenants.get(tenantId);
295         if (context == null) {
296             ListenerRegistration<DataChangeListener> registration = null;
297             registration = dataProvider
298                     .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
299                             TenantUtils.tenantIid(tenantId),
300                             new PolicyChangeListener(tenantId),
301                             DataChangeScope.SUBTREE);
302
303             context = new TenantContext(registration);
304             TenantContext oldContext
305                     = resolvedTenants.putIfAbsent(tenantId, context);
306             if (oldContext != null) {
307                 // already registered in a different thread; just use the other
308                 // context
309                 registration.close();
310                 context = oldContext;
311             } else {
312                 LOG.info("Added tenant {} to policy scope", tenantId);
313             }
314         }
315
316         // Resolve the new tenant and update atomically
317         final AtomicReference<IndexedTenant> tenantRef = context.tenant;
318         final IndexedTenant ot = tenantRef.get();
319         ReadOnlyTransaction transaction
320                 = dataProvider.newReadOnlyTransaction();
321         final InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
322         ListenableFuture<Optional<Tenant>> unresolved;
323
324         unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
325
326         Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
327             @Override
328             public void onSuccess(Optional<Tenant> result) {
329                 if (!result.isPresent()) {
330                     LOG.info("Tenant {} not found in CONF; check&delete from OPER", tenantId);
331                     deleteOperTenantIfExists(tiid, tenantId);
332                     return;
333                 }
334
335                 Tenant t = InheritanceUtils.resolveTenant(result.get());
336                 SubjectFeatureInstances subjectFeatureInstances = t.getSubjectFeatureInstances();
337                 if (subjectFeatureInstances != null) {
338                     // TODO log and remove invalid action instances
339                     if (actionInstancesAreValid(subjectFeatureInstances.getActionInstance())
340                             && classifierInstancesAreValid(subjectFeatureInstances.getClassifierInstance())) {
341                     IndexedTenant it = new IndexedTenant(t);
342                     if (!tenantRef.compareAndSet(ot, it)) {
343                         // concurrent update of tenant policy. Retry
344                         updateTenant(tenantId);
345                     } else {
346                         // Update the policy cache and notify listeners
347                         WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
348                         wt.put(LogicalDatastoreType.OPERATIONAL, tiid, t, true);
349                         wt.submit();
350                         updatePolicy();
351                     }
352                 }
353             }
354         }
355
356             @Override
357             public void onFailure(Throwable t) {
358                 LOG.error("Count not get tenant {}", tenantId, t);
359             }
360         }, executor);
361     }
362
363     private void deleteOperTenantIfExists(final InstanceIdentifier<Tenant> tiid, final TenantId tenantId) {
364         final ReadWriteTransaction rwTx = dataProvider.newReadWriteTransaction();
365
366         ListenableFuture<Optional<Tenant>> readFuture = rwTx.read(LogicalDatastoreType.OPERATIONAL, tiid);
367         Futures.addCallback(readFuture, new FutureCallback<Optional<Tenant>>() {
368             @Override
369             public void onSuccess(Optional<Tenant> result) {
370                 if(result.isPresent()){
371                     TenantContext tenantContext = resolvedTenants.get(tenantId);
372                     tenantContext.tenant.set(null);
373                     rwTx.delete(LogicalDatastoreType.OPERATIONAL, tiid);
374                     rwTx.submit();
375                     updatePolicy();
376                 }
377             }
378
379             @Override
380             public void onFailure(Throwable t) {
381                 LOG.error("Failed to read operational datastore: {}", t);
382                 rwTx.cancel();
383             }
384         }, executor);
385
386     }
387
388     protected void updatePolicy() {
389         try {
390             Map<EgKey, Set<ConditionSet>> egConditions = new HashMap<>();
391             Set<IndexedTenant> indexedTenants = getIndexedTenants(resolvedTenants.values());
392             Table<EgKey, EgKey, Policy> policyMap = PolicyResolverUtils.resolvePolicy(indexedTenants, egConditions);
393             Set<EgKey> updatedGroups = updatePolicy(policyMap, egConditions, policyListenerScopes);
394             updatePolicyInDataStore(policyMap);
395             //TODO the following will be removed when the policyInfo datastore is completed
396             notifyListeners(updatedGroups);
397         } catch (Exception e) {
398             LOG.error("Failed to update policy", e);
399         }
400     }
401
402     private void updatePolicyInDataStore(Table<EgKey, EgKey, Policy> policyMap) {
403         if (dataProvider == null) {
404             LOG.error("Couldn't Write Resolved Tenants Policy Info to Datastore because dataProvider is NULL");
405             return;
406         }
407         ResolvedPolicies resolvedPolicies = new ResolvedPoliciesBuilder().setResolvedPolicy(
408                 PolicyInfoUtils.buildResolvedPolicy(policyMap)).build();
409
410         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
411         t.put(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(ResolvedPolicies.class).build(),
412                 resolvedPolicies, true);
413         if (DataStoreHelper.submitToDs(t)) {
414             LOG.debug("Wrote resolved policies to Datastore");
415         } else {
416             LOG.error("Failed to write resolved policies to Datastore.");
417         }
418     }
419     private Set<IndexedTenant> getIndexedTenants(Collection<TenantContext> tenantCtxs) {
420         Set<IndexedTenant> result = new HashSet<>();
421         for (TenantContext tenant : tenantCtxs) {
422             IndexedTenant t = tenant.tenant.get();
423             if (t != null) {
424                 result.add(t);
425             }
426         }
427         return result;
428     }
429
430     /**
431      * Validation of action instances.
432      *
433      * @param actionInstances list of instances to validate
434      * @return true if instances are valid or if <code>actionInstances</code>
435      * is <code>null</code>, Otherwise returns false.
436      *
437      */
438     private boolean actionInstancesAreValid(List<ActionInstance> actionInstances) {
439         if (actionInstances == null) {
440             return true;
441         }
442         for (ActionInstance actionInstance : actionInstances) {
443             Set<Validator<ActionInstance>> actionInstanceValidators = actionInstanceValidatorsByDefinition.get(actionInstance.getActionDefinitionId());
444             for (Validator<ActionInstance> actionInstanceValidator : actionInstanceValidators) {
445                 ValidationResult validationResult = actionInstanceValidator.validate(actionInstance);
446                 if (!validationResult.isValid()) {
447                     LOG.error("ActionInstance {} is not valid!", actionInstance.getName());
448                     return false;
449                 }
450             }
451         }
452         return true;
453     }
454
455     /**
456      * Validation of classifier instances.
457      *
458      * @param classifierInstances list of instances to validate
459      * @return true if instances are valid or if <code>classifierInstances</code>
460      * is <code>null</code>, Otherwise returns false.
461      *
462      */
463     private boolean classifierInstancesAreValid(List<ClassifierInstance> classifierInstances) {
464         if (classifierInstances == null) {
465             return true;
466         }
467         for (ClassifierInstance classifierInstance : classifierInstances) {
468             Set<Validator<ClassifierInstance>> classifierInstanceValidators = classifierInstanceValidatorsByDefinition.get(classifierInstance.getClassifierDefinitionId());
469             for (Validator<ClassifierInstance> classifierInstanceValidator : classifierInstanceValidators) {
470                 ValidationResult validationResult = classifierInstanceValidator.validate(classifierInstance);
471                 if (!validationResult.isValid()) {
472                     LOG.error("ClassifierInstance {} is not valid!", classifierInstance.getName());
473                     return false;
474                 }
475             }
476         }
477         return true;
478     }
479
480     protected static class TenantContext {
481
482         ListenerRegistration<DataChangeListener> registration;
483
484         AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
485
486         public TenantContext(ListenerRegistration<DataChangeListener> registration) {
487             super();
488             this.registration = registration;
489         }
490     }
491
492     @Immutable
493     private class PolicyChangeListener implements DataChangeListener {
494
495         final TenantId tenantId;
496
497         public PolicyChangeListener(TenantId tenantId) {
498             super();
499             this.tenantId = tenantId;
500         }
501
502         @Override
503         public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
504             updateTenant(tenantId);
505         }
506
507     }
508
509 }