Merge "Bug 3746 added check if node is also in oper DS"
[groupbasedpolicy.git] / groupbasedpolicy / src / main / java / org / opendaylight / groupbasedpolicy / resolver / PolicyResolver.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.groupbasedpolicy.resolver;
9
10 import java.util.Collection;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.CopyOnWriteArrayList;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.atomic.AtomicReference;
21
22 import javax.annotation.concurrent.Immutable;
23
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.has.action.refs.ActionRef;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.Tenant;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.Contract;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.contract.Subject;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.contract.subject.Rule;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.binding.DataObject;
40 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import com.google.common.base.Optional;
45 import com.google.common.base.Predicate;
46 import com.google.common.collect.Sets;
47 import com.google.common.collect.Table;
48 import com.google.common.collect.Table.Cell;
49 import com.google.common.util.concurrent.FutureCallback;
50 import com.google.common.util.concurrent.Futures;
51 import com.google.common.util.concurrent.ListenableFuture;
52 import org.opendaylight.groupbasedpolicy.resolver.validator.PolicyValidator;
53 import org.opendaylight.groupbasedpolicy.resolver.validator.ValidationResult;
54
55 /**
56  * The policy resolver is a utility for renderers to help in resolving
57  * group-based policy into a form that is easier to apply to the actual network.
58  *
59  * For any pair of endpoint groups, there is a set of rules that could apply to
60  * the endpoints on that group based on the policy configuration. The exact list
61  * of rules that apply to a given pair of endpoints depends on the conditions
62  * that are active on the endpoints.
63  *
64  * We need to be able to query against this policy model, enumerate the relevant
65  * classes of traffic and endpoints, and notify renderers when there are changes
66  * to policy as it applies to active sets of endpoints and endpoint groups.
67  *
68  * The policy resolver will maintain the necessary state for all tenants in its
69  * control domain, which is the set of tenants for which policy listeners have
70  * been registered.
71  *
72  */
73 public class PolicyResolver implements AutoCloseable {
74
75     private static final Logger LOG = LoggerFactory.getLogger(PolicyResolver.class);
76
77     private final DataBroker dataProvider;
78     private final ScheduledExecutorService executor;
79
80     /**
81      * Keep track of the current relevant policy scopes.
82      */
83     protected CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
84
85     protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
86
87     /**
88      * Store a policy object for each endpoint group pair. The table is stored
89      * with the key as (consumer, provider). Two endpoints could appear in both
90      * roles at the same time, in which case both policies would apply.
91      */
92     AtomicReference<PolicyInfo> policy = new AtomicReference<>();
93
94     /*
95      * Store validators for ActionDefinitions from Renderers
96      *
97      */
98     protected ConcurrentMap<ActionDefinitionId, ActionInstanceValidator> registeredActions = new ConcurrentHashMap<>();
99
100     public PolicyResolver(DataBroker dataProvider,
101             ScheduledExecutorService executor) {
102         super();
103         this.dataProvider = dataProvider;
104         this.executor = executor;
105         policyListenerScopes = new CopyOnWriteArrayList<>();
106         resolvedTenants = new ConcurrentHashMap<>();
107         LOG.debug("Initialized renderer common policy resolver");
108     }
109
110     // *************
111     // AutoCloseable
112     // *************
113     @Override
114     public void close() throws Exception {
115         for (TenantContext ctx : resolvedTenants.values()) {
116             if (ctx.registration != null) {
117                 ctx.registration.close();
118             }
119         }
120     }
121
122     // *************************
123     // PolicyResolver public API
124     // *************************
125     /**
126      * Get a snapshot of the current policy
127      *
128      * @return the {@link PolicyInfo} object representing an immutable snapshot
129      * of the policy state
130      */
131     public PolicyInfo getCurrentPolicy() {
132         return policy.get();
133     }
134
135     /**
136      * Get the normalized tenant for the given ID
137      *
138      * @param tenant the tenant ID
139      * @return the {@link Tenant}
140      */
141     public IndexedTenant getTenant(TenantId tenant) {
142         TenantContext tc = resolvedTenants.get(tenant);
143         if (tc == null) {
144             return null;
145         }
146         return tc.tenant.get();
147     }
148
149     public void registerActionDefinitions(ActionDefinitionId actionDefinitionId, ActionInstanceValidator validator) {
150         registeredActions.putIfAbsent(actionDefinitionId, validator);
151     }
152
153     /**
154      * Register a listener to receive update events.
155      *
156      * @param listener the {@link PolicyListener} object to receive the update
157      * events
158      */
159     public PolicyScope registerListener(PolicyListener listener) {
160         PolicyScope ps = new PolicyScope(this, listener);
161         policyListenerScopes.add(ps);
162
163         return ps;
164     }
165
166     /**
167      * Remove the listener registered for the given {@link PolicyScope}.
168      *
169      * @param scope the scope to remove
170      * @see PolicyResolver#registerListener(PolicyListener)
171      */
172     public void removeListener(PolicyScope scope) {
173         policyListenerScopes.remove(scope);
174     }
175
176     // **************
177     // Implementation
178     // **************
179     /**
180      * Atomically update the active policy and notify policy listeners of
181      * relevant changes
182      *
183      * @param policyMap the new policy to set
184      * @param egConditions the map of endpoint groups to relevant condition sets
185      * @return the set of groups with updated policy
186      */
187     protected Set<EgKey> updatePolicy(Table<EgKey, EgKey, Policy> policyMap,
188             Map<EgKey, Set<ConditionSet>> egConditions,
189             List<PolicyScope> policyListenerScopes) {
190         PolicyInfo newPolicy = new PolicyInfo(policyMap, egConditions);
191         PolicyInfo oldPolicy = policy.getAndSet(newPolicy);
192
193         HashSet<EgKey> notifySet = new HashSet<>();
194
195         for (Cell<EgKey, EgKey, Policy> cell : newPolicy.getPolicyMap().cellSet()) {
196             Policy newp = cell.getValue();
197             Policy oldp = null;
198             if (oldPolicy != null) {
199                 oldp = oldPolicy.getPolicyMap().get(cell.getRowKey(),
200                         cell.getColumnKey());
201             }
202             if (oldp == null || !newp.equals(oldp)) {
203                 notifySet.add(cell.getRowKey());
204                 notifySet.add(cell.getColumnKey());
205             }
206         }
207         if (oldPolicy != null) {
208             for (Cell<EgKey, EgKey, Policy> cell : oldPolicy.getPolicyMap().cellSet()) {
209                 if (!newPolicy.getPolicyMap().contains(cell.getRowKey(),
210                         cell.getColumnKey())) {
211                     notifySet.add(cell.getRowKey());
212                     notifySet.add(cell.getColumnKey());
213                 }
214             }
215         }
216         return notifySet;
217     }
218
219     /**
220      * Notify the policy listeners about a set of updated groups
221      */
222     private void notifyListeners(Set<EgKey> updatedGroups) {
223         for (final PolicyScope scope : policyListenerScopes) {
224             Set<EgKey> filtered
225                     = Sets.filter(updatedGroups, new Predicate<EgKey>() {
226                         @Override
227                         public boolean apply(EgKey input) {
228                             return scope.contains(input.getTenantId(),
229                                     input.getEgId());
230                         }
231                     });
232             if (!filtered.isEmpty()) {
233                 scope.getListener().policyUpdated(filtered);
234             }
235         }
236     }
237
238     /**
239      * Subscribe the resolver to updates related to a particular tenant Make
240      * sure that this can't be called concurrently with subscribe
241      *
242      * @param tenantId the tenant ID to subscribe to
243      */
244     protected void subscribeTenant(TenantId tenantId) {
245         if (!resolvedTenants.containsKey(tenantId)) {
246             updateTenant(tenantId);
247         }
248     }
249
250     /**
251      * Unsubscribe the resolver from updates related to a particular tenant Make
252      * sure that this can't be called concurrently with subscribe
253      *
254      * @param tenantId the tenant ID to subscribe to
255      */
256     protected void unsubscribeTenant(TenantId tenantId) {
257         TenantContext context = resolvedTenants.get(tenantId);
258         if (context != null) {
259             resolvedTenants.remove(tenantId);
260             context.registration.close();
261         }
262     }
263
264     private void updateTenant(final TenantId tenantId) {
265         if (dataProvider == null) {
266             return;
267         }
268
269         TenantContext context = resolvedTenants.get(tenantId);
270         if (context == null) {
271             ListenerRegistration<DataChangeListener> registration = null;
272             registration = dataProvider
273                     .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
274                             TenantUtils.tenantIid(tenantId),
275                             new PolicyChangeListener(tenantId),
276                             DataChangeScope.SUBTREE);
277
278             context = new TenantContext(registration);
279             TenantContext oldContext
280                     = resolvedTenants.putIfAbsent(tenantId, context);
281             if (oldContext != null) {
282                 // already registered in a different thread; just use the other
283                 // context
284                 registration.close();
285                 context = oldContext;
286             } else {
287                 LOG.info("Added tenant {} to policy scope", tenantId);
288             }
289         }
290
291         // Resolve the new tenant and update atomically
292         final AtomicReference<IndexedTenant> tenantRef = context.tenant;
293         final IndexedTenant ot = tenantRef.get();
294         ReadOnlyTransaction transaction
295                 = dataProvider.newReadOnlyTransaction();
296         final InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
297         ListenableFuture<Optional<Tenant>> unresolved;
298
299         unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
300
301         Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
302             @Override
303             public void onSuccess(Optional<Tenant> result) {
304                 if (!result.isPresent()) {
305                     LOG.warn("Tenant {} not found", tenantId);
306                 }
307
308                 Tenant t = InheritanceUtils.resolveTenant(result.get());
309                 if (isValidTenant(t)) {
310                     IndexedTenant it = new IndexedTenant(t);
311                     if (!tenantRef.compareAndSet(ot, it)) {
312                         // concurrent update of tenant policy. Retry
313                         updateTenant(tenantId);
314                     } else {
315                         // Update the policy cache and notify listeners
316                         WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
317                         wt.put(LogicalDatastoreType.OPERATIONAL, tiid, t, true);
318                         wt.submit();
319                         updatePolicy();
320                     }
321                 }
322             }
323
324             @Override
325             public void onFailure(Throwable t) {
326                 LOG.error("Count not get tenant {}", tenantId, t);
327             }
328         }, executor);
329     }
330
331     protected void updatePolicy() {
332         try {
333             Map<EgKey, Set<ConditionSet>> egConditions = new HashMap<>();
334             Set<IndexedTenant> indexedTenants = getIndexedTenants(resolvedTenants.values());
335             Table<EgKey, EgKey, Policy> policyMap = PolicyResolverUtils.resolvePolicy(indexedTenants, egConditions);
336             Set<EgKey> updatedGroups = updatePolicy(policyMap, egConditions, policyListenerScopes);
337
338             notifyListeners(updatedGroups);
339         } catch (Exception e) {
340             LOG.error("Failed to update policy", e);
341         }
342     }
343
344     private Set<IndexedTenant> getIndexedTenants(Collection<TenantContext> tenantCtxs) {
345         Set<IndexedTenant> result = new HashSet<>();
346         for (TenantContext tenant : tenantCtxs) {
347             IndexedTenant t = tenant.tenant.get();
348             if (t != null) {
349                 result.add(t);
350             }
351         }
352         return result;
353     }
354
355     protected static class TenantContext {
356
357         ListenerRegistration<DataChangeListener> registration;
358
359         AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
360
361         public TenantContext(ListenerRegistration<DataChangeListener> registration) {
362             super();
363             this.registration = registration;
364         }
365     }
366
367     @Immutable
368     private class PolicyChangeListener implements DataChangeListener {
369
370         final TenantId tenantId;
371
372         public PolicyChangeListener(TenantId tenantId) {
373             super();
374             this.tenantId = tenantId;
375         }
376
377         @Override
378         public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
379             updateTenant(tenantId);
380         }
381
382     }
383
384     private boolean isValidTenant(Tenant t) {
385         ValidationResult validationResult = PolicyValidator.validate(t, this);
386         if (validationResult == null) {
387             return true;
388         }
389         return validationResult.getResult().getValue();
390     }
391
392     public ActionInstanceValidator getActionInstanceValidator(ActionDefinitionId actionDefinitionId) {
393         if (registeredActions == null) {
394             return null;
395         }
396         return registeredActions.get(actionDefinitionId);
397
398     }
399
400 }