6afeb3ec847ca307046b179279f302ca83d600f6
[groupbasedpolicy.git] / groupbasedpolicy / src / main / java / org / opendaylight / groupbasedpolicy / resolver / PolicyResolver.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.resolver;
10
11 import java.util.Collection;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.CopyOnWriteArrayList;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.atomic.AtomicReference;
22
23 import javax.annotation.concurrent.Immutable;
24
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
27 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.Tenant;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.binding.DataObject;
36 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import com.google.common.base.Optional;
41 import com.google.common.base.Predicate;
42 import com.google.common.collect.Sets;
43 import com.google.common.collect.Table;
44 import com.google.common.collect.Table.Cell;
45 import com.google.common.util.concurrent.FutureCallback;
46 import com.google.common.util.concurrent.Futures;
47 import com.google.common.util.concurrent.ListenableFuture;
48
49 /**
50  * The policy resolver is a utility for renderers to help in resolving
51  * group-based policy into a form that is easier to apply to the actual network.
52  *
53  * For any pair of endpoint groups, there is a set of rules that could apply to
54  * the endpoints on that group based on the policy configuration. The exact list
55  * of rules that apply to a given pair of endpoints depends on the conditions
56  * that are active on the endpoints.
57  *
58  * We need to be able to query against this policy model, enumerate the relevant
59  * classes of traffic and endpoints, and notify renderers when there are changes
60  * to policy as it applies to active sets of endpoints and endpoint groups.
61  *
62  * The policy resolver will maintain the necessary state for all tenants in its
63  * control domain, which is the set of tenants for which policy listeners have
64  * been registered.
65  *
66  */
67 public class PolicyResolver implements AutoCloseable {
68     private static final Logger LOG = LoggerFactory.getLogger(PolicyResolver.class);
69
70     private final DataBroker dataProvider;
71     private final ScheduledExecutorService executor;
72
73     /**
74      * Keep track of the current relevant policy scopes.
75      */
76     protected CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
77
78     protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
79
80     /**
81      * Store a policy object for each endpoint group pair. The table is stored
82      * with the key as (consumer, provider). Two endpoints could appear in both
83      * roles at the same time, in which case both policies would apply.
84      */
85     AtomicReference<PolicyInfo> policy = new AtomicReference<>();
86
87     public PolicyResolver(DataBroker dataProvider,
88             ScheduledExecutorService executor) {
89         super();
90         this.dataProvider = dataProvider;
91         this.executor = executor;
92         policyListenerScopes = new CopyOnWriteArrayList<>();
93         resolvedTenants = new ConcurrentHashMap<>();
94         LOG.debug("Initialized renderer common policy resolver");
95     }
96
97     // *************
98     // AutoCloseable
99     // *************
100
101     @Override
102     public void close() throws Exception {
103         for (TenantContext ctx : resolvedTenants.values()) {
104             if (ctx.registration != null)
105                 ctx.registration.close();
106         }
107     }
108
109     // *************************
110     // PolicyResolver public API
111     // *************************
112
113     /**
114      * Get a snapshot of the current policy
115      *
116      * @return the {@link PolicyInfo} object representing an immutable snapshot
117      *         of the policy state
118      */
119     public PolicyInfo getCurrentPolicy() {
120         return policy.get();
121     }
122
123     /**
124      * Get the normalized tenant for the given ID
125      *
126      * @param tenant
127      *            the tenant ID
128      * @return the {@link Tenant}
129      */
130     public IndexedTenant getTenant(TenantId tenant) {
131         TenantContext tc = resolvedTenants.get(tenant);
132         if (tc == null)
133             return null;
134         return tc.tenant.get();
135     }
136
137     /**
138      * Register a listener to receive update events.
139      *
140      * @param listener
141      *            the {@link PolicyListener} object to receive the update events
142      */
143     public PolicyScope registerListener(PolicyListener listener) {
144         PolicyScope ps = new PolicyScope(this, listener);
145         policyListenerScopes.add(ps);
146
147         return ps;
148     }
149
150     /**
151      * Remove the listener registered for the given {@link PolicyScope}.
152      *
153      * @param scope
154      *            the scope to remove
155      * @see PolicyResolver#registerListener(PolicyListener)
156      */
157     public void removeListener(PolicyScope scope) {
158         policyListenerScopes.remove(scope);
159     }
160
161     // **************
162     // Implementation
163     // **************
164
165     /**
166      * Atomically update the active policy and notify policy listeners of
167      * relevant changes
168      *
169      * @param policyMap
170      *            the new policy to set
171      * @param egConditions
172      *            the map of endpoint groups to relevant condition sets
173      * @return the set of groups with updated policy
174      */
175     protected Set<EgKey> updatePolicy(Table<EgKey, EgKey, Policy> policyMap,
176             Map<EgKey, Set<ConditionSet>> egConditions,
177             List<PolicyScope> policyListenerScopes) {
178         PolicyInfo newPolicy = new PolicyInfo(policyMap, egConditions);
179         PolicyInfo oldPolicy = policy.getAndSet(newPolicy);
180
181         HashSet<EgKey> notifySet = new HashSet<>();
182
183         for (Cell<EgKey, EgKey, Policy> cell : newPolicy.getPolicyMap().cellSet()) {
184             Policy newp = cell.getValue();
185             Policy oldp = null;
186             if (oldPolicy != null)
187                 oldp = oldPolicy.getPolicyMap().get(cell.getRowKey(),
188                         cell.getColumnKey());
189             if (oldp == null || !newp.equals(oldp)) {
190                 notifySet.add(cell.getRowKey());
191                 notifySet.add(cell.getColumnKey());
192             }
193         }
194         if (oldPolicy != null) {
195             for (Cell<EgKey, EgKey, Policy> cell : oldPolicy.getPolicyMap().cellSet()) {
196                 if (!newPolicy.getPolicyMap().contains(cell.getRowKey(),
197                         cell.getColumnKey())) {
198                     notifySet.add(cell.getRowKey());
199                     notifySet.add(cell.getColumnKey());
200                 }
201             }
202         }
203         return notifySet;
204     }
205
206     /**
207      * Notify the policy listeners about a set of updated groups
208      */
209     private void notifyListeners(Set<EgKey> updatedGroups) {
210         for (final PolicyScope scope : policyListenerScopes) {
211             Set<EgKey> filtered =
212                     Sets.filter(updatedGroups, new Predicate<EgKey>() {
213                         @Override
214                         public boolean apply(EgKey input) {
215                             return scope.contains(input.getTenantId(),
216                                     input.getEgId());
217                         }
218                     });
219             if (!filtered.isEmpty()) {
220                 scope.getListener().policyUpdated(filtered);
221             }
222         }
223     }
224
225     /**
226      * Subscribe the resolver to updates related to a particular tenant Make
227      * sure that this can't be called concurrently with subscribe
228      *
229      * @param tenantId
230      *            the tenant ID to subscribe to
231      */
232     protected void subscribeTenant(TenantId tenantId) {
233         if (!resolvedTenants.containsKey(tenantId))
234             updateTenant(tenantId);
235     }
236
237     /**
238      * Unsubscribe the resolver from updates related to a particular tenant Make
239      * sure that this can't be called concurrently with subscribe
240      *
241      * @param tenantId
242      *            the tenant ID to subscribe to
243      */
244     protected void unsubscribeTenant(TenantId tenantId) {
245         TenantContext context = resolvedTenants.get(tenantId);
246         if (context != null) {
247             resolvedTenants.remove(tenantId);
248             context.registration.close();
249         }
250     }
251
252     private void updateTenant(final TenantId tenantId) {
253         if (dataProvider == null)
254             return;
255
256         TenantContext context = resolvedTenants.get(tenantId);
257         if (context == null) {
258             ListenerRegistration<DataChangeListener> registration = null;
259             registration = dataProvider
260                     .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
261                             TenantUtils.tenantIid(tenantId),
262                             new PolicyChangeListener(tenantId),
263                             DataChangeScope.SUBTREE);
264
265             context = new TenantContext(registration);
266             TenantContext oldContext =
267                     resolvedTenants.putIfAbsent(tenantId, context);
268             if (oldContext != null) {
269                 // already registered in a different thread; just use the other
270                 // context
271                 registration.close();
272                 context = oldContext;
273             } else {
274                 LOG.info("Added tenant {} to policy scope", tenantId);
275             }
276         }
277
278         // Resolve the new tenant and update atomically
279         final AtomicReference<IndexedTenant> tenantRef = context.tenant;
280         final IndexedTenant ot = tenantRef.get();
281         ReadOnlyTransaction transaction =
282                 dataProvider.newReadOnlyTransaction();
283         final InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
284         ListenableFuture<Optional<Tenant>> unresolved;
285
286         unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
287
288         Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
289             @Override
290             public void onSuccess(Optional<Tenant> result) {
291                 if (!result.isPresent()) {
292                     LOG.warn("Tenant {} not found", tenantId);
293                 }
294
295                 Tenant t = InheritanceUtils.resolveTenant(result.get());
296                 IndexedTenant it = new IndexedTenant(t);
297                 if (!tenantRef.compareAndSet(ot, it)) {
298                     // concurrent update of tenant policy. Retry
299                     updateTenant(tenantId);
300                 } else {
301                     // Update the policy cache and notify listeners
302                     WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
303                     wt.put(LogicalDatastoreType.OPERATIONAL, tiid, t, true);
304                     wt.submit();
305                     updatePolicy();
306                 }
307             }
308
309             @Override
310             public void onFailure(Throwable t) {
311                 LOG.error("Count not get tenant {}", tenantId, t);
312             }
313         }, executor);
314     }
315
316     protected void updatePolicy() {
317         try {
318             Map<EgKey, Set<ConditionSet>> egConditions = new HashMap<>();
319             Set<IndexedTenant> indexedTenants = getIndexedTenants(resolvedTenants.values());
320             Table<EgKey, EgKey, Policy> policyMap = PolicyResolverUtils.resolvePolicy(indexedTenants, egConditions);
321             Set<EgKey> updatedGroups = updatePolicy(policyMap, egConditions, policyListenerScopes);
322
323             notifyListeners(updatedGroups);
324         } catch (Exception e) {
325             LOG.error("Failed to update policy", e);
326         }
327     }
328
329     private Set<IndexedTenant> getIndexedTenants(Collection<TenantContext> tenantCtxs) {
330         Set<IndexedTenant> result = new HashSet<>();
331         for (TenantContext tenant : tenantCtxs) {
332             IndexedTenant t = tenant.tenant.get();
333             if (t != null)
334                 result.add(t);
335         }
336         return result;
337     }
338
339     protected static class TenantContext {
340         ListenerRegistration<DataChangeListener> registration;
341
342         AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
343
344         public TenantContext(ListenerRegistration<DataChangeListener> registration) {
345             super();
346             this.registration = registration;
347         }
348     }
349
350     @Immutable
351     private class PolicyChangeListener implements DataChangeListener {
352         final TenantId tenantId;
353
354         public PolicyChangeListener(TenantId tenantId) {
355             super();
356             this.tenantId = tenantId;
357         }
358
359         @Override
360         public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
361             updateTenant(tenantId);
362         }
363
364     }
365 }