2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.groupbasedpolicy.resolver;
10 import java.util.Collection;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.List;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.CopyOnWriteArrayList;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.atomic.AtomicReference;
22 import javax.annotation.concurrent.Immutable;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
31 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.Tenant;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPolicies;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPoliciesBuilder;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.binding.DataObject;
40 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import com.google.common.base.Optional;
45 import com.google.common.base.Predicate;
46 import com.google.common.collect.Sets;
47 import com.google.common.collect.Table;
48 import com.google.common.collect.Table.Cell;
49 import com.google.common.util.concurrent.FutureCallback;
50 import com.google.common.util.concurrent.Futures;
51 import com.google.common.util.concurrent.ListenableFuture;
53 import org.opendaylight.groupbasedpolicy.resolver.validator.PolicyValidator;
54 import org.opendaylight.groupbasedpolicy.resolver.validator.ValidationResult;
55 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
58 * The policy resolver is a utility for renderers to help in resolving
59 * group-based policy into a form that is easier to apply to the actual network.
61 * For any pair of endpoint groups, there is a set of rules that could apply to
62 * the endpoints on that group based on the policy configuration. The exact list
63 * of rules that apply to a given pair of endpoints depends on the conditions
64 * that are active on the endpoints.
66 * We need to be able to query against this policy model, enumerate the relevant
67 * classes of traffic and endpoints, and notify renderers when there are changes
68 * to policy as it applies to active sets of endpoints and endpoint groups.
70 * The policy resolver will maintain the necessary state for all tenants in its
71 * control domain, which is the set of tenants for which policy listeners have
75 public class PolicyResolver implements AutoCloseable {
77 private static final Logger LOG = LoggerFactory.getLogger(PolicyResolver.class);
79 private final DataBroker dataProvider;
80 private final ScheduledExecutorService executor;
83 * Keep track of the current relevant policy scopes.
85 protected CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
87 protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
90 * Store a policy object for each endpoint group pair. The table is stored
91 * with the key as (consumer, provider). Two endpoints could appear in both
92 * roles at the same time, in which case both policies would apply.
94 AtomicReference<PolicyInfo> policy = new AtomicReference<>();
97 * Store validators for ActionDefinitions from Renderers
100 protected ConcurrentMap<ActionDefinitionId, ActionInstanceValidator> registeredActions = new ConcurrentHashMap<>();
102 public PolicyResolver(DataBroker dataProvider,
103 ScheduledExecutorService executor) {
105 this.dataProvider = dataProvider;
106 this.executor = executor;
107 policyListenerScopes = new CopyOnWriteArrayList<>();
108 resolvedTenants = new ConcurrentHashMap<>();
109 LOG.debug("Initialized renderer common policy resolver");
116 public void close() throws Exception {
117 for (TenantContext ctx : resolvedTenants.values()) {
118 if (ctx.registration != null) {
119 ctx.registration.close();
124 // *************************
125 // PolicyResolver public API
126 // *************************
128 * Get a snapshot of the current policy
130 * @return the {@link PolicyInfo} object representing an immutable snapshot
131 * of the policy state
133 public PolicyInfo getCurrentPolicy() {
138 * Get the normalized tenant for the given ID
140 * @param tenant the tenant ID
141 * @return the {@link Tenant}
143 public IndexedTenant getTenant(TenantId tenant) {
144 TenantContext tc = resolvedTenants.get(tenant);
148 return tc.tenant.get();
151 public void registerActionDefinitions(ActionDefinitionId actionDefinitionId, ActionInstanceValidator validator) {
152 registeredActions.putIfAbsent(actionDefinitionId, validator);
156 * Register a listener to receive update events.
158 * @param listener the {@link PolicyListener} object to receive the update
161 public PolicyScope registerListener(PolicyListener listener) {
162 PolicyScope ps = new PolicyScope(this, listener);
163 policyListenerScopes.add(ps);
169 * Remove the listener registered for the given {@link PolicyScope}.
171 * @param scope the scope to remove
172 * @see PolicyResolver#registerListener(PolicyListener)
174 public void removeListener(PolicyScope scope) {
175 policyListenerScopes.remove(scope);
182 * Atomically update the active policy and notify policy listeners of
185 * @param policyMap the new policy to set
186 * @param egConditions the map of endpoint groups to relevant condition sets
187 * @return the set of groups with updated policy
189 protected Set<EgKey> updatePolicy(Table<EgKey, EgKey, Policy> policyMap,
190 Map<EgKey, Set<ConditionSet>> egConditions,
191 List<PolicyScope> policyListenerScopes) {
192 PolicyInfo newPolicy = new PolicyInfo(policyMap, egConditions);
193 PolicyInfo oldPolicy = policy.getAndSet(newPolicy);
195 HashSet<EgKey> notifySet = new HashSet<>();
197 for (Cell<EgKey, EgKey, Policy> cell : newPolicy.getPolicyMap().cellSet()) {
198 Policy newp = cell.getValue();
200 if (oldPolicy != null) {
201 oldp = oldPolicy.getPolicyMap().get(cell.getRowKey(),
202 cell.getColumnKey());
204 if (oldp == null || !newp.equals(oldp)) {
205 notifySet.add(cell.getRowKey());
206 notifySet.add(cell.getColumnKey());
209 if (oldPolicy != null) {
210 for (Cell<EgKey, EgKey, Policy> cell : oldPolicy.getPolicyMap().cellSet()) {
211 if (!newPolicy.getPolicyMap().contains(cell.getRowKey(),
212 cell.getColumnKey())) {
213 notifySet.add(cell.getRowKey());
214 notifySet.add(cell.getColumnKey());
222 * Notify the policy listeners about a set of updated groups
224 private void notifyListeners(Set<EgKey> updatedGroups) {
225 for (final PolicyScope scope : policyListenerScopes) {
227 = Sets.filter(updatedGroups, new Predicate<EgKey>() {
229 public boolean apply(EgKey input) {
230 return scope.contains(input.getTenantId(),
234 if (!filtered.isEmpty()) {
235 scope.getListener().policyUpdated(filtered);
241 * Subscribe the resolver to updates related to a particular tenant Make
242 * sure that this can't be called concurrently with subscribe
244 * @param tenantId the tenant ID to subscribe to
246 protected void subscribeTenant(TenantId tenantId) {
247 if (!resolvedTenants.containsKey(tenantId)) {
248 updateTenant(tenantId);
253 * Unsubscribe the resolver from updates related to a particular tenant Make
254 * sure that this can't be called concurrently with subscribe
256 * @param tenantId the tenant ID to subscribe to
258 protected void unsubscribeTenant(TenantId tenantId) {
259 TenantContext context = resolvedTenants.get(tenantId);
260 if (context != null) {
261 resolvedTenants.remove(tenantId);
262 context.registration.close();
266 private void updateTenant(final TenantId tenantId) {
267 if (dataProvider == null) {
271 TenantContext context = resolvedTenants.get(tenantId);
272 if (context == null) {
273 ListenerRegistration<DataChangeListener> registration = null;
274 registration = dataProvider
275 .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
276 TenantUtils.tenantIid(tenantId),
277 new PolicyChangeListener(tenantId),
278 DataChangeScope.SUBTREE);
280 context = new TenantContext(registration);
281 TenantContext oldContext
282 = resolvedTenants.putIfAbsent(tenantId, context);
283 if (oldContext != null) {
284 // already registered in a different thread; just use the other
286 registration.close();
287 context = oldContext;
289 LOG.info("Added tenant {} to policy scope", tenantId);
293 // Resolve the new tenant and update atomically
294 final AtomicReference<IndexedTenant> tenantRef = context.tenant;
295 final IndexedTenant ot = tenantRef.get();
296 ReadOnlyTransaction transaction
297 = dataProvider.newReadOnlyTransaction();
298 final InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
299 ListenableFuture<Optional<Tenant>> unresolved;
301 unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
303 Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
305 public void onSuccess(Optional<Tenant> result) {
306 if (!result.isPresent()) {
307 LOG.info("Tenant {} not found in CONF; check&delete from OPER", tenantId);
308 deleteOperTenantIfExists(tiid, tenantId);
312 Tenant t = InheritanceUtils.resolveTenant(result.get());
313 if (isValidTenant(t)) {
314 IndexedTenant it = new IndexedTenant(t);
315 if (!tenantRef.compareAndSet(ot, it)) {
316 // concurrent update of tenant policy. Retry
317 updateTenant(tenantId);
319 // Update the policy cache and notify listeners
320 WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
321 wt.put(LogicalDatastoreType.OPERATIONAL, tiid, t, true);
329 public void onFailure(Throwable t) {
330 LOG.error("Count not get tenant {}", tenantId, t);
335 private void deleteOperTenantIfExists(final InstanceIdentifier<Tenant> tiid, final TenantId tenantId) {
336 final ReadWriteTransaction rwTx = dataProvider.newReadWriteTransaction();
338 ListenableFuture<Optional<Tenant>> readFuture = rwTx.read(LogicalDatastoreType.OPERATIONAL, tiid);
339 Futures.addCallback(readFuture, new FutureCallback<Optional<Tenant>>() {
341 public void onSuccess(Optional<Tenant> result) {
342 if(result.isPresent()){
343 TenantContext tenantContext = resolvedTenants.get(tenantId);
344 tenantContext.tenant.set(null);
345 rwTx.delete(LogicalDatastoreType.OPERATIONAL, tiid);
352 public void onFailure(Throwable t) {
353 LOG.error("Failed to read operational datastore: {}", t);
360 protected void updatePolicy() {
362 Map<EgKey, Set<ConditionSet>> egConditions = new HashMap<>();
363 Set<IndexedTenant> indexedTenants = getIndexedTenants(resolvedTenants.values());
364 Table<EgKey, EgKey, Policy> policyMap = PolicyResolverUtils.resolvePolicy(indexedTenants, egConditions);
365 Set<EgKey> updatedGroups = updatePolicy(policyMap, egConditions, policyListenerScopes);
366 updatePolicyInDataStore(policyMap);
367 //TODO the following will be removed when the policyInfo datastore is completed
368 notifyListeners(updatedGroups);
369 } catch (Exception e) {
370 LOG.error("Failed to update policy", e);
374 private void updatePolicyInDataStore(Table<EgKey, EgKey, Policy> policyMap) {
375 if (dataProvider == null) {
376 LOG.error("Couldn't Write Resolved Tenants Policy Info to Datastore because dataProvider is NULL");
379 ResolvedPolicies resolvedPolicies = new ResolvedPoliciesBuilder().setResolvedPolicy(
380 PolicyInfoUtils.buildResolvedPolicy(policyMap)).build();
382 WriteTransaction t = dataProvider.newWriteOnlyTransaction();
383 t.put(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(ResolvedPolicies.class).build(),
384 resolvedPolicies, true);
385 if (DataStoreHelper.submitToDs(t)) {
386 LOG.debug("Wrote resolved policies to Datastore");
388 LOG.error("Failed to write resolved policies to Datastore.");
391 private Set<IndexedTenant> getIndexedTenants(Collection<TenantContext> tenantCtxs) {
392 Set<IndexedTenant> result = new HashSet<>();
393 for (TenantContext tenant : tenantCtxs) {
394 IndexedTenant t = tenant.tenant.get();
402 protected static class TenantContext {
404 ListenerRegistration<DataChangeListener> registration;
406 AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
408 public TenantContext(ListenerRegistration<DataChangeListener> registration) {
410 this.registration = registration;
415 private class PolicyChangeListener implements DataChangeListener {
417 final TenantId tenantId;
419 public PolicyChangeListener(TenantId tenantId) {
421 this.tenantId = tenantId;
425 public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
426 updateTenant(tenantId);
431 private boolean isValidTenant(Tenant t) {
432 ValidationResult validationResult = PolicyValidator.validate(t, this);
433 if (validationResult == null) {
436 return validationResult.getResult().getValue();
439 public ActionInstanceValidator getActionInstanceValidator(ActionDefinitionId actionDefinitionId) {
440 if (registeredActions == null) {
443 return registeredActions.get(actionDefinitionId);