2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.resolver;
11 import java.util.Collection;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.CopyOnWriteArrayList;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.atomic.AtomicReference;
23 import javax.annotation.concurrent.Immutable;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
27 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.has.action.refs.ActionRef;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.Tenant;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.Contract;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.contract.Subject;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.contract.subject.Rule;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.subject.feature.instances.ActionInstance;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.yang.binding.DataObject;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import com.google.common.base.Optional;
47 import com.google.common.base.Predicate;
48 import com.google.common.collect.Sets;
49 import com.google.common.collect.Table;
50 import com.google.common.collect.Table.Cell;
51 import com.google.common.util.concurrent.FutureCallback;
52 import com.google.common.util.concurrent.Futures;
53 import com.google.common.util.concurrent.ListenableFuture;
56 * The policy resolver is a utility for renderers to help in resolving
57 * group-based policy into a form that is easier to apply to the actual network.
59 * For any pair of endpoint groups, there is a set of rules that could apply to
60 * the endpoints on that group based on the policy configuration. The exact list
61 * of rules that apply to a given pair of endpoints depends on the conditions
62 * that are active on the endpoints.
64 * We need to be able to query against this policy model, enumerate the relevant
65 * classes of traffic and endpoints, and notify renderers when there are changes
66 * to policy as it applies to active sets of endpoints and endpoint groups.
68 * The policy resolver will maintain the necessary state for all tenants in its
69 * control domain, which is the set of tenants for which policy listeners have
73 public class PolicyResolver implements AutoCloseable {
74 private static final Logger LOG = LoggerFactory.getLogger(PolicyResolver.class);
76 private final DataBroker dataProvider;
77 private final ScheduledExecutorService executor;
80 * Keep track of the current relevant policy scopes.
82 protected CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
84 protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
88 * Store a policy object for each endpoint group pair. The table is stored
89 * with the key as (consumer, provider). Two endpoints could appear in both
90 * roles at the same time, in which case both policies would apply.
92 AtomicReference<PolicyInfo> policy = new AtomicReference<>();
95 * Store validators for ActionDefinitions from Renderers
99 protected ConcurrentMap<ActionDefinitionId, ActionInstanceValidator> registeredActions = new ConcurrentHashMap<>();
101 public PolicyResolver(DataBroker dataProvider,
102 ScheduledExecutorService executor) {
104 this.dataProvider = dataProvider;
105 this.executor = executor;
106 policyListenerScopes = new CopyOnWriteArrayList<>();
107 resolvedTenants = new ConcurrentHashMap<>();
108 LOG.debug("Initialized renderer common policy resolver");
116 public void close() throws Exception {
117 for (TenantContext ctx : resolvedTenants.values()) {
118 if (ctx.registration != null)
119 ctx.registration.close();
123 // *************************
124 // PolicyResolver public API
125 // *************************
128 * Get a snapshot of the current policy
130 * @return the {@link PolicyInfo} object representing an immutable snapshot
131 * of the policy state
133 public PolicyInfo getCurrentPolicy() {
138 * Get the normalized tenant for the given ID
142 * @return the {@link Tenant}
144 public IndexedTenant getTenant(TenantId tenant) {
145 TenantContext tc = resolvedTenants.get(tenant);
148 return tc.tenant.get();
151 public void registerActionDefinitions(ActionDefinitionId actionDefinitionId, ActionInstanceValidator validator) {
152 registeredActions.putIfAbsent(actionDefinitionId, validator);
155 * Register a listener to receive update events.
158 * the {@link PolicyListener} object to receive the update events
160 public PolicyScope registerListener(PolicyListener listener) {
161 PolicyScope ps = new PolicyScope(this, listener);
162 policyListenerScopes.add(ps);
168 * Remove the listener registered for the given {@link PolicyScope}.
171 * the scope to remove
172 * @see PolicyResolver#registerListener(PolicyListener)
174 public void removeListener(PolicyScope scope) {
175 policyListenerScopes.remove(scope);
183 * Atomically update the active policy and notify policy listeners of
187 * the new policy to set
188 * @param egConditions
189 * the map of endpoint groups to relevant condition sets
190 * @return the set of groups with updated policy
192 protected Set<EgKey> updatePolicy(Table<EgKey, EgKey, Policy> policyMap,
193 Map<EgKey, Set<ConditionSet>> egConditions,
194 List<PolicyScope> policyListenerScopes) {
195 PolicyInfo newPolicy = new PolicyInfo(policyMap, egConditions);
196 PolicyInfo oldPolicy = policy.getAndSet(newPolicy);
198 HashSet<EgKey> notifySet = new HashSet<>();
200 for (Cell<EgKey, EgKey, Policy> cell : newPolicy.getPolicyMap().cellSet()) {
201 Policy newp = cell.getValue();
203 if (oldPolicy != null)
204 oldp = oldPolicy.getPolicyMap().get(cell.getRowKey(),
205 cell.getColumnKey());
206 if (oldp == null || !newp.equals(oldp)) {
207 notifySet.add(cell.getRowKey());
208 notifySet.add(cell.getColumnKey());
211 if (oldPolicy != null) {
212 for (Cell<EgKey, EgKey, Policy> cell : oldPolicy.getPolicyMap().cellSet()) {
213 if (!newPolicy.getPolicyMap().contains(cell.getRowKey(),
214 cell.getColumnKey())) {
215 notifySet.add(cell.getRowKey());
216 notifySet.add(cell.getColumnKey());
224 * Notify the policy listeners about a set of updated groups
226 private void notifyListeners(Set<EgKey> updatedGroups) {
227 for (final PolicyScope scope : policyListenerScopes) {
228 Set<EgKey> filtered =
229 Sets.filter(updatedGroups, new Predicate<EgKey>() {
231 public boolean apply(EgKey input) {
232 return scope.contains(input.getTenantId(),
236 if (!filtered.isEmpty()) {
237 scope.getListener().policyUpdated(filtered);
243 * Subscribe the resolver to updates related to a particular tenant Make
244 * sure that this can't be called concurrently with subscribe
247 * the tenant ID to subscribe to
249 protected void subscribeTenant(TenantId tenantId) {
250 if (!resolvedTenants.containsKey(tenantId))
251 updateTenant(tenantId);
255 * Unsubscribe the resolver from updates related to a particular tenant Make
256 * sure that this can't be called concurrently with subscribe
259 * the tenant ID to subscribe to
261 protected void unsubscribeTenant(TenantId tenantId) {
262 TenantContext context = resolvedTenants.get(tenantId);
263 if (context != null) {
264 resolvedTenants.remove(tenantId);
265 context.registration.close();
269 private void updateTenant(final TenantId tenantId) {
270 if (dataProvider == null)
273 TenantContext context = resolvedTenants.get(tenantId);
274 if (context == null) {
275 ListenerRegistration<DataChangeListener> registration = null;
276 registration = dataProvider
277 .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
278 TenantUtils.tenantIid(tenantId),
279 new PolicyChangeListener(tenantId),
280 DataChangeScope.SUBTREE);
282 context = new TenantContext(registration);
283 TenantContext oldContext =
284 resolvedTenants.putIfAbsent(tenantId, context);
285 if (oldContext != null) {
286 // already registered in a different thread; just use the other
288 registration.close();
289 context = oldContext;
291 LOG.info("Added tenant {} to policy scope", tenantId);
295 // Resolve the new tenant and update atomically
296 final AtomicReference<IndexedTenant> tenantRef = context.tenant;
297 final IndexedTenant ot = tenantRef.get();
298 ReadOnlyTransaction transaction =
299 dataProvider.newReadOnlyTransaction();
300 final InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
301 ListenableFuture<Optional<Tenant>> unresolved;
303 unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
305 Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
307 public void onSuccess(Optional<Tenant> result) {
308 if (!result.isPresent()) {
309 LOG.warn("Tenant {} not found", tenantId);
312 Tenant t = InheritanceUtils.resolveTenant(result.get());
313 if (isValidTenant(t)) {
314 IndexedTenant it = new IndexedTenant(t);
315 if (!tenantRef.compareAndSet(ot, it)) {
316 // concurrent update of tenant policy. Retry
317 updateTenant(tenantId);
319 // Update the policy cache and notify listeners
320 WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
321 wt.put(LogicalDatastoreType.OPERATIONAL, tiid, t, true);
333 public void onFailure(Throwable t) {
334 LOG.error("Count not get tenant {}", tenantId, t);
339 protected void updatePolicy() {
341 Map<EgKey, Set<ConditionSet>> egConditions = new HashMap<>();
342 Set<IndexedTenant> indexedTenants = getIndexedTenants(resolvedTenants.values());
343 Table<EgKey, EgKey, Policy> policyMap = PolicyResolverUtils.resolvePolicy(indexedTenants, egConditions);
344 Set<EgKey> updatedGroups = updatePolicy(policyMap, egConditions, policyListenerScopes);
346 notifyListeners(updatedGroups);
347 } catch (Exception e) {
348 LOG.error("Failed to update policy", e);
352 private Set<IndexedTenant> getIndexedTenants(Collection<TenantContext> tenantCtxs) {
353 Set<IndexedTenant> result = new HashSet<>();
354 for (TenantContext tenant : tenantCtxs) {
355 IndexedTenant t = tenant.tenant.get();
362 protected static class TenantContext {
363 ListenerRegistration<DataChangeListener> registration;
365 AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
367 public TenantContext(ListenerRegistration<DataChangeListener> registration) {
369 this.registration = registration;
374 private class PolicyChangeListener implements DataChangeListener {
375 final TenantId tenantId;
377 public PolicyChangeListener(TenantId tenantId) {
379 this.tenantId = tenantId;
383 public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
384 updateTenant(tenantId);
389 private boolean isValidTenant(Tenant t) {
390 if(validActionInstances(t.getSubjectFeatureInstances().getActionInstance())) {
396 private boolean validActionInstances(List<ActionInstance> actionInstances) {
397 for(ActionInstance actionInstance : actionInstances) {
398 if(!(registeredActions.get(actionInstance.getActionDefinitionId()).isValid(actionInstance))) {
405 private boolean validContracts(List<Contract> contracts) {
406 for (Contract contract: contracts) {
407 validateSubjects(contract.getSubject());
412 private void validateSubjects(List<Subject> subjects) {
413 for(Subject subject: subjects) {
414 validateRules(subject.getRule());
419 private void validateRules(List<Rule> rules) {
423 private void validateActionRefs(List<ActionRef> actionRefs) {