2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.groupbasedpolicy.resolver;
10 import java.util.Collection;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.List;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.CopyOnWriteArrayList;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.atomic.AtomicReference;
22 import javax.annotation.concurrent.Immutable;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.groupbasedpolicy.resolver.validator.ValidationResult;
33 import org.opendaylight.groupbasedpolicy.resolver.validator.Validator;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ClassifierDefinitionId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.Tenant;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPolicies;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPoliciesBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.SubjectFeatureInstances;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.subject.feature.instances.ActionInstance;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.subject.feature.instances.ClassifierInstance;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.binding.DataObject;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 import com.google.common.base.Optional;
50 import com.google.common.base.Predicate;
51 import com.google.common.collect.HashMultimap;
52 import com.google.common.collect.Multimaps;
53 import com.google.common.collect.SetMultimap;
54 import com.google.common.collect.Sets;
55 import com.google.common.collect.Table;
56 import com.google.common.collect.Table.Cell;
57 import com.google.common.util.concurrent.FutureCallback;
58 import com.google.common.util.concurrent.Futures;
59 import com.google.common.util.concurrent.ListenableFuture;
61 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
64 * The policy resolver is a utility for renderers to help in resolving
65 * group-based policy into a form that is easier to apply to the actual network.
67 * For any pair of endpoint groups, there is a set of rules that could apply to
68 * the endpoints on that group based on the policy configuration. The exact list
69 * of rules that apply to a given pair of endpoints depends on the conditions
70 * that are active on the endpoints.
72 * We need to be able to query against this policy model, enumerate the relevant
73 * classes of traffic and endpoints, and notify renderers when there are changes
74 * to policy as it applies to active sets of endpoints and endpoint groups.
76 * The policy resolver will maintain the necessary state for all tenants in its
77 * control domain, which is the set of tenants for which policy listeners have
81 public class PolicyResolver implements AutoCloseable {
83 private static final Logger LOG = LoggerFactory.getLogger(PolicyResolver.class);
85 private final DataBroker dataProvider;
86 private final ScheduledExecutorService executor;
89 * Keep track of the current relevant policy scopes.
91 protected CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
93 protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
96 * Store a policy object for each endpoint group pair. The table is stored
97 * with the key as (consumer, provider). Two endpoints could appear in both
98 * roles at the same time, in which case both policies would apply.
100 AtomicReference<PolicyInfo> policy = new AtomicReference<>();
103 * Store validators for ActionDefinitions from Renderers
106 protected SetMultimap<ActionDefinitionId, Validator<ActionInstance>> actionInstanceValidatorsByDefinition = Multimaps.synchronizedSetMultimap(HashMultimap.<ActionDefinitionId, Validator<ActionInstance>>create());
107 protected SetMultimap<ClassifierDefinitionId, Validator<ClassifierInstance>> classifierInstanceValidatorsByDefinition = Multimaps.synchronizedSetMultimap(HashMultimap.<ClassifierDefinitionId, Validator<ClassifierInstance>>create());
109 public PolicyResolver(DataBroker dataProvider,
110 ScheduledExecutorService executor) {
112 this.dataProvider = dataProvider;
113 this.executor = executor;
114 policyListenerScopes = new CopyOnWriteArrayList<>();
115 resolvedTenants = new ConcurrentHashMap<>();
116 LOG.debug("Initialized renderer common policy resolver");
123 public void close() throws Exception {
124 for (TenantContext ctx : resolvedTenants.values()) {
125 if (ctx.registration != null) {
126 ctx.registration.close();
131 // *************************
132 // PolicyResolver public API
133 // *************************
135 * Get a snapshot of the current policy
137 * @return the {@link PolicyInfo} object representing an immutable snapshot
138 * of the policy state
140 public PolicyInfo getCurrentPolicy() {
145 * Get the normalized tenant for the given ID
147 * @param tenant the tenant ID
148 * @return the {@link Tenant}
150 public IndexedTenant getTenant(TenantId tenant) {
151 TenantContext tc = resolvedTenants.get(tenant);
155 return tc.tenant.get();
158 public void registerActionInstanceValidators(ActionDefinitionId actionDefinitionId,
159 Validator<ActionInstance> validator) {
160 actionInstanceValidatorsByDefinition.put(actionDefinitionId, validator);
163 public void unregisterActionInstanceValidators(ActionDefinitionId actionDefinitionId,
164 Validator<ActionInstance> validator) {
165 actionInstanceValidatorsByDefinition.remove(actionDefinitionId, validator);
168 public void registerClassifierInstanceValidators(ClassifierDefinitionId classifierDefinitionId,
169 Validator<ClassifierInstance> validator) {
170 classifierInstanceValidatorsByDefinition.put(classifierDefinitionId, validator);
173 public void unregisterClassifierInstanceValidators(ClassifierDefinitionId classifierDefinitionId,
174 Validator<ClassifierInstance> validator) {
175 classifierInstanceValidatorsByDefinition.remove(classifierDefinitionId, validator);
179 * Register a listener to receive update events.
181 * @param listener the {@link PolicyListener} object to receive the update
184 public PolicyScope registerListener(PolicyListener listener) {
185 PolicyScope ps = new PolicyScope(this, listener);
186 policyListenerScopes.add(ps);
192 * Remove the listener registered for the given {@link PolicyScope}.
194 * @param scope the scope to remove
195 * @see PolicyResolver#registerListener(PolicyListener)
197 public void removeListener(PolicyScope scope) {
198 policyListenerScopes.remove(scope);
205 * Atomically update the active policy and notify policy listeners of
208 * @param policyMap the new policy to set
209 * @param egConditions the map of endpoint groups to relevant condition sets
210 * @return the set of groups with updated policy
212 protected Set<EgKey> updatePolicy(Table<EgKey, EgKey, Policy> policyMap,
213 Map<EgKey, Set<ConditionSet>> egConditions,
214 List<PolicyScope> policyListenerScopes) {
215 PolicyInfo newPolicy = new PolicyInfo(policyMap, egConditions);
216 PolicyInfo oldPolicy = policy.getAndSet(newPolicy);
218 HashSet<EgKey> notifySet = new HashSet<>();
220 for (Cell<EgKey, EgKey, Policy> cell : newPolicy.getPolicyMap().cellSet()) {
221 Policy newp = cell.getValue();
223 if (oldPolicy != null) {
224 oldp = oldPolicy.getPolicyMap().get(cell.getRowKey(),
225 cell.getColumnKey());
227 if (oldp == null || !newp.equals(oldp)) {
228 notifySet.add(cell.getRowKey());
229 notifySet.add(cell.getColumnKey());
232 if (oldPolicy != null) {
233 for (Cell<EgKey, EgKey, Policy> cell : oldPolicy.getPolicyMap().cellSet()) {
234 if (!newPolicy.getPolicyMap().contains(cell.getRowKey(),
235 cell.getColumnKey())) {
236 notifySet.add(cell.getRowKey());
237 notifySet.add(cell.getColumnKey());
245 * Notify the policy listeners about a set of updated groups
247 private void notifyListeners(Set<EgKey> updatedGroups) {
248 for (final PolicyScope scope : policyListenerScopes) {
250 = Sets.filter(updatedGroups, new Predicate<EgKey>() {
252 public boolean apply(EgKey input) {
253 return scope.contains(input.getTenantId(),
257 if (!filtered.isEmpty()) {
258 scope.getListener().policyUpdated(filtered);
264 * Subscribe the resolver to updates related to a particular tenant Make
265 * sure that this can't be called concurrently with subscribe
267 * @param tenantId the tenant ID to subscribe to
269 protected void subscribeTenant(TenantId tenantId) {
270 if (!resolvedTenants.containsKey(tenantId)) {
271 updateTenant(tenantId);
276 * Unsubscribe the resolver from updates related to a particular tenant Make
277 * sure that this can't be called concurrently with subscribe
279 * @param tenantId the tenant ID to subscribe to
281 protected void unsubscribeTenant(TenantId tenantId) {
282 TenantContext context = resolvedTenants.get(tenantId);
283 if (context != null) {
284 resolvedTenants.remove(tenantId);
285 context.registration.close();
289 private void updateTenant(final TenantId tenantId) {
290 if (dataProvider == null) {
294 TenantContext context = resolvedTenants.get(tenantId);
295 if (context == null) {
296 ListenerRegistration<DataChangeListener> registration = null;
297 registration = dataProvider
298 .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
299 TenantUtils.tenantIid(tenantId),
300 new PolicyChangeListener(tenantId),
301 DataChangeScope.SUBTREE);
303 context = new TenantContext(registration);
304 TenantContext oldContext
305 = resolvedTenants.putIfAbsent(tenantId, context);
306 if (oldContext != null) {
307 // already registered in a different thread; just use the other
309 registration.close();
310 context = oldContext;
312 LOG.info("Added tenant {} to policy scope", tenantId);
316 // Resolve the new tenant and update atomically
317 final AtomicReference<IndexedTenant> tenantRef = context.tenant;
318 final IndexedTenant ot = tenantRef.get();
319 ReadOnlyTransaction transaction
320 = dataProvider.newReadOnlyTransaction();
321 final InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
322 ListenableFuture<Optional<Tenant>> unresolved;
324 unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
326 Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
328 public void onSuccess(Optional<Tenant> result) {
329 if (!result.isPresent()) {
330 LOG.info("Tenant {} not found in CONF; check&delete from OPER", tenantId);
331 deleteOperTenantIfExists(tiid, tenantId);
335 Tenant t = InheritanceUtils.resolveTenant(result.get());
336 SubjectFeatureInstances subjectFeatureInstances = t.getSubjectFeatureInstances();
337 if (subjectFeatureInstances != null) {
338 // TODO log and remove invalid action instances
339 if (actionInstancesAreValid(subjectFeatureInstances.getActionInstance())
340 && classifierInstancesAreValid(subjectFeatureInstances.getClassifierInstance())) {
341 IndexedTenant it = new IndexedTenant(t);
342 if (!tenantRef.compareAndSet(ot, it)) {
343 // concurrent update of tenant policy. Retry
344 updateTenant(tenantId);
346 // Update the policy cache and notify listeners
347 WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
348 wt.put(LogicalDatastoreType.OPERATIONAL, tiid, t, true);
357 public void onFailure(Throwable t) {
358 LOG.error("Count not get tenant {}", tenantId, t);
363 private void deleteOperTenantIfExists(final InstanceIdentifier<Tenant> tiid, final TenantId tenantId) {
364 final ReadWriteTransaction rwTx = dataProvider.newReadWriteTransaction();
366 ListenableFuture<Optional<Tenant>> readFuture = rwTx.read(LogicalDatastoreType.OPERATIONAL, tiid);
367 Futures.addCallback(readFuture, new FutureCallback<Optional<Tenant>>() {
369 public void onSuccess(Optional<Tenant> result) {
370 if(result.isPresent()){
371 TenantContext tenantContext = resolvedTenants.get(tenantId);
372 tenantContext.tenant.set(null);
373 rwTx.delete(LogicalDatastoreType.OPERATIONAL, tiid);
380 public void onFailure(Throwable t) {
381 LOG.error("Failed to read operational datastore: {}", t);
388 protected void updatePolicy() {
390 Map<EgKey, Set<ConditionSet>> egConditions = new HashMap<>();
391 Set<IndexedTenant> indexedTenants = getIndexedTenants(resolvedTenants.values());
392 Table<EgKey, EgKey, Policy> policyMap = PolicyResolverUtils.resolvePolicy(indexedTenants, egConditions);
393 Set<EgKey> updatedGroups = updatePolicy(policyMap, egConditions, policyListenerScopes);
394 updatePolicyInDataStore(policyMap);
395 //TODO the following will be removed when the policyInfo datastore is completed
396 notifyListeners(updatedGroups);
397 } catch (Exception e) {
398 LOG.error("Failed to update policy", e);
402 private void updatePolicyInDataStore(Table<EgKey, EgKey, Policy> policyMap) {
403 if (dataProvider == null) {
404 LOG.error("Couldn't Write Resolved Tenants Policy Info to Datastore because dataProvider is NULL");
407 ResolvedPolicies resolvedPolicies = new ResolvedPoliciesBuilder().setResolvedPolicy(
408 PolicyInfoUtils.buildResolvedPolicy(policyMap)).build();
410 WriteTransaction t = dataProvider.newWriteOnlyTransaction();
411 t.put(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(ResolvedPolicies.class).build(),
412 resolvedPolicies, true);
413 if (DataStoreHelper.submitToDs(t)) {
414 LOG.debug("Wrote resolved policies to Datastore");
416 LOG.error("Failed to write resolved policies to Datastore.");
419 private Set<IndexedTenant> getIndexedTenants(Collection<TenantContext> tenantCtxs) {
420 Set<IndexedTenant> result = new HashSet<>();
421 for (TenantContext tenant : tenantCtxs) {
422 IndexedTenant t = tenant.tenant.get();
431 * Validation of action instances.
433 * @param actionInstances list of instances to validate
434 * @return true if instances are valid or if <code>actionInstances</code>
435 * is <code>null</code>, Otherwise returns false.
438 private boolean actionInstancesAreValid(List<ActionInstance> actionInstances) {
439 if (actionInstances == null) {
442 for (ActionInstance actionInstance : actionInstances) {
443 Set<Validator<ActionInstance>> actionInstanceValidators = actionInstanceValidatorsByDefinition.get(actionInstance.getActionDefinitionId());
444 for (Validator<ActionInstance> actionInstanceValidator : actionInstanceValidators) {
445 ValidationResult validationResult = actionInstanceValidator.validate(actionInstance);
446 if (!validationResult.isValid()) {
447 LOG.error("ActionInstance {} is not valid!", actionInstance.getName());
456 * Validation of classifier instances.
458 * @param classifierInstances list of instances to validate
459 * @return true if instances are valid or if <code>classifierInstances</code>
460 * is <code>null</code>, Otherwise returns false.
463 private boolean classifierInstancesAreValid(List<ClassifierInstance> classifierInstances) {
464 if (classifierInstances == null) {
467 for (ClassifierInstance classifierInstance : classifierInstances) {
468 Set<Validator<ClassifierInstance>> classifierInstanceValidators = classifierInstanceValidatorsByDefinition.get(classifierInstance.getClassifierDefinitionId());
469 for (Validator<ClassifierInstance> classifierInstanceValidator : classifierInstanceValidators) {
470 ValidationResult validationResult = classifierInstanceValidator.validate(classifierInstance);
471 if (!validationResult.isValid()) {
472 LOG.error("ClassifierInstance {} is not valid!", classifierInstance.getName());
480 protected static class TenantContext {
482 ListenerRegistration<DataChangeListener> registration;
484 AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
486 public TenantContext(ListenerRegistration<DataChangeListener> registration) {
488 this.registration = registration;
493 private class PolicyChangeListener implements DataChangeListener {
495 final TenantId tenantId;
497 public PolicyChangeListener(TenantId tenantId) {
499 this.tenantId = tenantId;
503 public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
504 updateTenant(tenantId);