2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.resolver;
11 import java.util.Collection;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.CopyOnWriteArrayList;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.atomic.AtomicReference;
23 import javax.annotation.concurrent.Immutable;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
27 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.Tenant;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.binding.DataObject;
36 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 import com.google.common.base.Optional;
41 import com.google.common.base.Predicate;
42 import com.google.common.collect.Sets;
43 import com.google.common.collect.Table;
44 import com.google.common.collect.Table.Cell;
45 import com.google.common.util.concurrent.FutureCallback;
46 import com.google.common.util.concurrent.Futures;
47 import com.google.common.util.concurrent.ListenableFuture;
50 * The policy resolver is a utility for renderers to help in resolving
51 * group-based policy into a form that is easier to apply to the actual network.
53 * For any pair of endpoint groups, there is a set of rules that could apply to
54 * the endpoints on that group based on the policy configuration. The exact list
55 * of rules that apply to a given pair of endpoints depends on the conditions
56 * that are active on the endpoints.
58 * We need to be able to query against this policy model, enumerate the relevant
59 * classes of traffic and endpoints, and notify renderers when there are changes
60 * to policy as it applies to active sets of endpoints and endpoint groups.
62 * The policy resolver will maintain the necessary state for all tenants in its
63 * control domain, which is the set of tenants for which policy listeners have
67 public class PolicyResolver implements AutoCloseable {
68 private static final Logger LOG = LoggerFactory.getLogger(PolicyResolver.class);
70 private final DataBroker dataProvider;
71 private final ScheduledExecutorService executor;
74 * Keep track of the current relevant policy scopes.
76 protected CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
78 protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
81 * Store a policy object for each endpoint group pair. The table is stored
82 * with the key as (consumer, provider). Two endpoints could appear in both
83 * roles at the same time, in which case both policies would apply.
85 AtomicReference<PolicyInfo> policy = new AtomicReference<>();
87 public PolicyResolver(DataBroker dataProvider,
88 ScheduledExecutorService executor) {
90 this.dataProvider = dataProvider;
91 this.executor = executor;
92 policyListenerScopes = new CopyOnWriteArrayList<>();
93 resolvedTenants = new ConcurrentHashMap<>();
94 LOG.debug("Initialized renderer common policy resolver");
102 public void close() throws Exception {
103 for (TenantContext ctx : resolvedTenants.values()) {
104 if (ctx.registration != null)
105 ctx.registration.close();
109 // *************************
110 // PolicyResolver public API
111 // *************************
114 * Get a snapshot of the current policy
116 * @return the {@link PolicyInfo} object representing an immutable snapshot
117 * of the policy state
119 public PolicyInfo getCurrentPolicy() {
124 * Get the normalized tenant for the given ID
128 * @return the {@link Tenant}
130 public IndexedTenant getTenant(TenantId tenant) {
131 TenantContext tc = resolvedTenants.get(tenant);
134 return tc.tenant.get();
138 * Register a listener to receive update events.
141 * the {@link PolicyListener} object to receive the update events
143 public PolicyScope registerListener(PolicyListener listener) {
144 PolicyScope ps = new PolicyScope(this, listener);
145 policyListenerScopes.add(ps);
151 * Remove the listener registered for the given {@link PolicyScope}.
154 * the scope to remove
155 * @see PolicyResolver#registerListener(PolicyListener)
157 public void removeListener(PolicyScope scope) {
158 policyListenerScopes.remove(scope);
166 * Atomically update the active policy and notify policy listeners of
170 * the new policy to set
171 * @param egConditions
172 * the map of endpoint groups to relevant condition sets
173 * @return the set of groups with updated policy
175 protected Set<EgKey> updatePolicy(Table<EgKey, EgKey, Policy> policyMap,
176 Map<EgKey, Set<ConditionSet>> egConditions,
177 List<PolicyScope> policyListenerScopes) {
178 PolicyInfo newPolicy = new PolicyInfo(policyMap, egConditions);
179 PolicyInfo oldPolicy = policy.getAndSet(newPolicy);
181 HashSet<EgKey> notifySet = new HashSet<>();
183 for (Cell<EgKey, EgKey, Policy> cell : newPolicy.getPolicyMap().cellSet()) {
184 Policy newp = cell.getValue();
186 if (oldPolicy != null)
187 oldp = oldPolicy.getPolicyMap().get(cell.getRowKey(),
188 cell.getColumnKey());
189 if (oldp == null || !newp.equals(oldp)) {
190 notifySet.add(cell.getRowKey());
191 notifySet.add(cell.getColumnKey());
194 if (oldPolicy != null) {
195 for (Cell<EgKey, EgKey, Policy> cell : oldPolicy.getPolicyMap().cellSet()) {
196 if (!newPolicy.getPolicyMap().contains(cell.getRowKey(),
197 cell.getColumnKey())) {
198 notifySet.add(cell.getRowKey());
199 notifySet.add(cell.getColumnKey());
207 * Notify the policy listeners about a set of updated groups
209 private void notifyListeners(Set<EgKey> updatedGroups) {
210 for (final PolicyScope scope : policyListenerScopes) {
211 Set<EgKey> filtered =
212 Sets.filter(updatedGroups, new Predicate<EgKey>() {
214 public boolean apply(EgKey input) {
215 return scope.contains(input.getTenantId(),
219 if (!filtered.isEmpty()) {
220 scope.getListener().policyUpdated(filtered);
226 * Subscribe the resolver to updates related to a particular tenant Make
227 * sure that this can't be called concurrently with subscribe
230 * the tenant ID to subscribe to
232 protected void subscribeTenant(TenantId tenantId) {
233 if (!resolvedTenants.containsKey(tenantId))
234 updateTenant(tenantId);
238 * Unsubscribe the resolver from updates related to a particular tenant Make
239 * sure that this can't be called concurrently with subscribe
242 * the tenant ID to subscribe to
244 protected void unsubscribeTenant(TenantId tenantId) {
245 TenantContext context = resolvedTenants.get(tenantId);
246 if (context != null) {
247 resolvedTenants.remove(tenantId);
248 context.registration.close();
252 private void updateTenant(final TenantId tenantId) {
253 if (dataProvider == null)
256 TenantContext context = resolvedTenants.get(tenantId);
257 if (context == null) {
258 ListenerRegistration<DataChangeListener> registration = null;
259 registration = dataProvider
260 .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
261 TenantUtils.tenantIid(tenantId),
262 new PolicyChangeListener(tenantId),
263 DataChangeScope.SUBTREE);
265 context = new TenantContext(registration);
266 TenantContext oldContext =
267 resolvedTenants.putIfAbsent(tenantId, context);
268 if (oldContext != null) {
269 // already registered in a different thread; just use the other
271 registration.close();
272 context = oldContext;
274 LOG.info("Added tenant {} to policy scope", tenantId);
278 // Resolve the new tenant and update atomically
279 final AtomicReference<IndexedTenant> tenantRef = context.tenant;
280 final IndexedTenant ot = tenantRef.get();
281 ReadOnlyTransaction transaction =
282 dataProvider.newReadOnlyTransaction();
283 final InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
284 ListenableFuture<Optional<Tenant>> unresolved;
286 unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
288 Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
290 public void onSuccess(Optional<Tenant> result) {
291 if (!result.isPresent()) {
292 LOG.warn("Tenant {} not found", tenantId);
295 Tenant t = InheritanceUtils.resolveTenant(result.get());
296 IndexedTenant it = new IndexedTenant(t);
297 if (!tenantRef.compareAndSet(ot, it)) {
298 // concurrent update of tenant policy. Retry
299 updateTenant(tenantId);
301 // Update the policy cache and notify listeners
302 WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
303 wt.put(LogicalDatastoreType.OPERATIONAL, tiid, t, true);
310 public void onFailure(Throwable t) {
311 LOG.error("Count not get tenant {}", tenantId, t);
316 protected void updatePolicy() {
318 Map<EgKey, Set<ConditionSet>> egConditions = new HashMap<>();
319 Set<IndexedTenant> indexedTenants = getIndexedTenants(resolvedTenants.values());
320 Table<EgKey, EgKey, Policy> policyMap = PolicyResolverUtils.resolvePolicy(indexedTenants, egConditions);
321 Set<EgKey> updatedGroups = updatePolicy(policyMap, egConditions, policyListenerScopes);
323 notifyListeners(updatedGroups);
324 } catch (Exception e) {
325 LOG.error("Failed to update policy", e);
329 private Set<IndexedTenant> getIndexedTenants(Collection<TenantContext> tenantCtxs) {
330 Set<IndexedTenant> result = new HashSet<>();
331 for (TenantContext tenant : tenantCtxs) {
332 IndexedTenant t = tenant.tenant.get();
339 protected static class TenantContext {
340 ListenerRegistration<DataChangeListener> registration;
342 AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
344 public TenantContext(ListenerRegistration<DataChangeListener> registration) {
346 this.registration = registration;
351 private class PolicyChangeListener implements DataChangeListener {
352 final TenantId tenantId;
354 public PolicyChangeListener(TenantId tenantId) {
356 this.tenantId = tenantId;
360 public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
361 updateTenant(tenantId);