723645fc2eb2fe6c21221defb280d2050711dddf
[groupbasedpolicy.git] / groupbasedpolicy / src / main / java / org / opendaylight / groupbasedpolicy / resolver / PolicyResolver.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.groupbasedpolicy.resolver;
9
10 import java.util.Collection;
11 import java.util.HashMap;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.CopyOnWriteArrayList;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.atomic.AtomicReference;
21
22 import javax.annotation.concurrent.Immutable;
23
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
31 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.Tenant;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPolicies;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPoliciesBuilder;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.binding.DataObject;
40 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import com.google.common.base.Optional;
45 import com.google.common.base.Predicate;
46 import com.google.common.collect.Sets;
47 import com.google.common.collect.Table;
48 import com.google.common.collect.Table.Cell;
49 import com.google.common.util.concurrent.FutureCallback;
50 import com.google.common.util.concurrent.Futures;
51 import com.google.common.util.concurrent.ListenableFuture;
52
53 import org.opendaylight.groupbasedpolicy.resolver.validator.PolicyValidator;
54 import org.opendaylight.groupbasedpolicy.resolver.validator.ValidationResult;
55 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
56
57 /**
58  * The policy resolver is a utility for renderers to help in resolving
59  * group-based policy into a form that is easier to apply to the actual network.
60  *
61  * For any pair of endpoint groups, there is a set of rules that could apply to
62  * the endpoints on that group based on the policy configuration. The exact list
63  * of rules that apply to a given pair of endpoints depends on the conditions
64  * that are active on the endpoints.
65  *
66  * We need to be able to query against this policy model, enumerate the relevant
67  * classes of traffic and endpoints, and notify renderers when there are changes
68  * to policy as it applies to active sets of endpoints and endpoint groups.
69  *
70  * The policy resolver will maintain the necessary state for all tenants in its
71  * control domain, which is the set of tenants for which policy listeners have
72  * been registered.
73  *
74  */
75 public class PolicyResolver implements AutoCloseable {
76
77     private static final Logger LOG = LoggerFactory.getLogger(PolicyResolver.class);
78
79     private final DataBroker dataProvider;
80     private final ScheduledExecutorService executor;
81
82     /**
83      * Keep track of the current relevant policy scopes.
84      */
85     protected CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
86
87     protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
88
89     /**
90      * Store a policy object for each endpoint group pair. The table is stored
91      * with the key as (consumer, provider). Two endpoints could appear in both
92      * roles at the same time, in which case both policies would apply.
93      */
94     AtomicReference<PolicyInfo> policy = new AtomicReference<>();
95
96     /*
97      * Store validators for ActionDefinitions from Renderers
98      *
99      */
100     protected ConcurrentMap<ActionDefinitionId, ActionInstanceValidator> registeredActions = new ConcurrentHashMap<>();
101
102     public PolicyResolver(DataBroker dataProvider,
103             ScheduledExecutorService executor) {
104         super();
105         this.dataProvider = dataProvider;
106         this.executor = executor;
107         policyListenerScopes = new CopyOnWriteArrayList<>();
108         resolvedTenants = new ConcurrentHashMap<>();
109         LOG.debug("Initialized renderer common policy resolver");
110     }
111
112     // *************
113     // AutoCloseable
114     // *************
115     @Override
116     public void close() throws Exception {
117         for (TenantContext ctx : resolvedTenants.values()) {
118             if (ctx.registration != null) {
119                 ctx.registration.close();
120             }
121         }
122     }
123
124     // *************************
125     // PolicyResolver public API
126     // *************************
127     /**
128      * Get a snapshot of the current policy
129      *
130      * @return the {@link PolicyInfo} object representing an immutable snapshot
131      * of the policy state
132      */
133     public PolicyInfo getCurrentPolicy() {
134         return policy.get();
135     }
136
137     /**
138      * Get the normalized tenant for the given ID
139      *
140      * @param tenant the tenant ID
141      * @return the {@link Tenant}
142      */
143     public IndexedTenant getTenant(TenantId tenant) {
144         TenantContext tc = resolvedTenants.get(tenant);
145         if (tc == null) {
146             return null;
147         }
148         return tc.tenant.get();
149     }
150
151     public void registerActionDefinitions(ActionDefinitionId actionDefinitionId, ActionInstanceValidator validator) {
152         registeredActions.putIfAbsent(actionDefinitionId, validator);
153     }
154
155     /**
156      * Register a listener to receive update events.
157      *
158      * @param listener the {@link PolicyListener} object to receive the update
159      * events
160      */
161     public PolicyScope registerListener(PolicyListener listener) {
162         PolicyScope ps = new PolicyScope(this, listener);
163         policyListenerScopes.add(ps);
164
165         return ps;
166     }
167
168     /**
169      * Remove the listener registered for the given {@link PolicyScope}.
170      *
171      * @param scope the scope to remove
172      * @see PolicyResolver#registerListener(PolicyListener)
173      */
174     public void removeListener(PolicyScope scope) {
175         policyListenerScopes.remove(scope);
176     }
177
178     // **************
179     // Implementation
180     // **************
181     /**
182      * Atomically update the active policy and notify policy listeners of
183      * relevant changes
184      *
185      * @param policyMap the new policy to set
186      * @param egConditions the map of endpoint groups to relevant condition sets
187      * @return the set of groups with updated policy
188      */
189     protected Set<EgKey> updatePolicy(Table<EgKey, EgKey, Policy> policyMap,
190             Map<EgKey, Set<ConditionSet>> egConditions,
191             List<PolicyScope> policyListenerScopes) {
192         PolicyInfo newPolicy = new PolicyInfo(policyMap, egConditions);
193         PolicyInfo oldPolicy = policy.getAndSet(newPolicy);
194
195         HashSet<EgKey> notifySet = new HashSet<>();
196
197         for (Cell<EgKey, EgKey, Policy> cell : newPolicy.getPolicyMap().cellSet()) {
198             Policy newp = cell.getValue();
199             Policy oldp = null;
200             if (oldPolicy != null) {
201                 oldp = oldPolicy.getPolicyMap().get(cell.getRowKey(),
202                         cell.getColumnKey());
203             }
204             if (oldp == null || !newp.equals(oldp)) {
205                 notifySet.add(cell.getRowKey());
206                 notifySet.add(cell.getColumnKey());
207             }
208         }
209         if (oldPolicy != null) {
210             for (Cell<EgKey, EgKey, Policy> cell : oldPolicy.getPolicyMap().cellSet()) {
211                 if (!newPolicy.getPolicyMap().contains(cell.getRowKey(),
212                         cell.getColumnKey())) {
213                     notifySet.add(cell.getRowKey());
214                     notifySet.add(cell.getColumnKey());
215                 }
216             }
217         }
218         return notifySet;
219     }
220
221     /**
222      * Notify the policy listeners about a set of updated groups
223      */
224     private void notifyListeners(Set<EgKey> updatedGroups) {
225         for (final PolicyScope scope : policyListenerScopes) {
226             Set<EgKey> filtered
227                     = Sets.filter(updatedGroups, new Predicate<EgKey>() {
228                         @Override
229                         public boolean apply(EgKey input) {
230                             return scope.contains(input.getTenantId(),
231                                     input.getEgId());
232                         }
233                     });
234             if (!filtered.isEmpty()) {
235                 scope.getListener().policyUpdated(filtered);
236             }
237         }
238     }
239
240     /**
241      * Subscribe the resolver to updates related to a particular tenant Make
242      * sure that this can't be called concurrently with subscribe
243      *
244      * @param tenantId the tenant ID to subscribe to
245      */
246     protected void subscribeTenant(TenantId tenantId) {
247         if (!resolvedTenants.containsKey(tenantId)) {
248             updateTenant(tenantId);
249         }
250     }
251
252     /**
253      * Unsubscribe the resolver from updates related to a particular tenant Make
254      * sure that this can't be called concurrently with subscribe
255      *
256      * @param tenantId the tenant ID to subscribe to
257      */
258     protected void unsubscribeTenant(TenantId tenantId) {
259         TenantContext context = resolvedTenants.get(tenantId);
260         if (context != null) {
261             resolvedTenants.remove(tenantId);
262             context.registration.close();
263         }
264     }
265
266     private void updateTenant(final TenantId tenantId) {
267         if (dataProvider == null) {
268             return;
269         }
270
271         TenantContext context = resolvedTenants.get(tenantId);
272         if (context == null) {
273             ListenerRegistration<DataChangeListener> registration = null;
274             registration = dataProvider
275                     .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
276                             TenantUtils.tenantIid(tenantId),
277                             new PolicyChangeListener(tenantId),
278                             DataChangeScope.SUBTREE);
279
280             context = new TenantContext(registration);
281             TenantContext oldContext
282                     = resolvedTenants.putIfAbsent(tenantId, context);
283             if (oldContext != null) {
284                 // already registered in a different thread; just use the other
285                 // context
286                 registration.close();
287                 context = oldContext;
288             } else {
289                 LOG.info("Added tenant {} to policy scope", tenantId);
290             }
291         }
292
293         // Resolve the new tenant and update atomically
294         final AtomicReference<IndexedTenant> tenantRef = context.tenant;
295         final IndexedTenant ot = tenantRef.get();
296         ReadOnlyTransaction transaction
297                 = dataProvider.newReadOnlyTransaction();
298         final InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
299         ListenableFuture<Optional<Tenant>> unresolved;
300
301         unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
302
303         Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
304             @Override
305             public void onSuccess(Optional<Tenant> result) {
306                 if (!result.isPresent()) {
307                     LOG.info("Tenant {} not found in CONF; check&delete from OPER", tenantId);
308                     deleteOperTenantIfExists(tiid, tenantId);
309                     return;
310                 }
311
312                 Tenant t = InheritanceUtils.resolveTenant(result.get());
313                 if (isValidTenant(t)) {
314                     IndexedTenant it = new IndexedTenant(t);
315                     if (!tenantRef.compareAndSet(ot, it)) {
316                         // concurrent update of tenant policy. Retry
317                         updateTenant(tenantId);
318                     } else {
319                         // Update the policy cache and notify listeners
320                         WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
321                         wt.put(LogicalDatastoreType.OPERATIONAL, tiid, t, true);
322                         wt.submit();
323                         updatePolicy();
324                     }
325                 }
326             }
327
328             @Override
329             public void onFailure(Throwable t) {
330                 LOG.error("Count not get tenant {}", tenantId, t);
331             }
332         }, executor);
333     }
334
335     private void deleteOperTenantIfExists(final InstanceIdentifier<Tenant> tiid, final TenantId tenantId) {
336         final ReadWriteTransaction rwTx = dataProvider.newReadWriteTransaction();
337
338         ListenableFuture<Optional<Tenant>> readFuture = rwTx.read(LogicalDatastoreType.OPERATIONAL, tiid);
339         Futures.addCallback(readFuture, new FutureCallback<Optional<Tenant>>() {
340             @Override
341             public void onSuccess(Optional<Tenant> result) {
342                 if(result.isPresent()){
343                     TenantContext tenantContext = resolvedTenants.get(tenantId);
344                     tenantContext.tenant.set(null);
345                     rwTx.delete(LogicalDatastoreType.OPERATIONAL, tiid);
346                     rwTx.submit();
347                     updatePolicy();
348                 }
349             }
350
351             @Override
352             public void onFailure(Throwable t) {
353                 LOG.error("Failed to read operational datastore: {}", t);
354                 rwTx.cancel();
355             }
356         }, executor);
357
358     }
359
360     protected void updatePolicy() {
361         try {
362             Map<EgKey, Set<ConditionSet>> egConditions = new HashMap<>();
363             Set<IndexedTenant> indexedTenants = getIndexedTenants(resolvedTenants.values());
364             Table<EgKey, EgKey, Policy> policyMap = PolicyResolverUtils.resolvePolicy(indexedTenants, egConditions);
365             Set<EgKey> updatedGroups = updatePolicy(policyMap, egConditions, policyListenerScopes);
366             updatePolicyInDataStore(policyMap);
367             //TODO the following will be removed when the policyInfo datastore is completed
368             notifyListeners(updatedGroups);
369         } catch (Exception e) {
370             LOG.error("Failed to update policy", e);
371         }
372     }
373
374     private void updatePolicyInDataStore(Table<EgKey, EgKey, Policy> policyMap) {
375         if (dataProvider == null) {
376             LOG.error("Couldn't Write Resolved Tenants Policy Info to Datastore because dataProvider is NULL");
377             return;
378         }
379         ResolvedPolicies resolvedPolicies = new ResolvedPoliciesBuilder().setResolvedPolicy(
380                 PolicyInfoUtils.buildResolvedPolicy(policyMap)).build();
381
382         WriteTransaction t = dataProvider.newWriteOnlyTransaction();
383         t.put(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(ResolvedPolicies.class).build(),
384                 resolvedPolicies, true);
385         if (DataStoreHelper.submitToDs(t)) {
386             LOG.debug("Wrote resolved policies to Datastore");
387         } else {
388             LOG.error("Failed to write resolved policies to Datastore.");
389         }
390     }
391     private Set<IndexedTenant> getIndexedTenants(Collection<TenantContext> tenantCtxs) {
392         Set<IndexedTenant> result = new HashSet<>();
393         for (TenantContext tenant : tenantCtxs) {
394             IndexedTenant t = tenant.tenant.get();
395             if (t != null) {
396                 result.add(t);
397             }
398         }
399         return result;
400     }
401
402     protected static class TenantContext {
403
404         ListenerRegistration<DataChangeListener> registration;
405
406         AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
407
408         public TenantContext(ListenerRegistration<DataChangeListener> registration) {
409             super();
410             this.registration = registration;
411         }
412     }
413
414     @Immutable
415     private class PolicyChangeListener implements DataChangeListener {
416
417         final TenantId tenantId;
418
419         public PolicyChangeListener(TenantId tenantId) {
420             super();
421             this.tenantId = tenantId;
422         }
423
424         @Override
425         public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
426             updateTenant(tenantId);
427         }
428
429     }
430
431     private boolean isValidTenant(Tenant t) {
432         ValidationResult validationResult = PolicyValidator.validate(t, this);
433         if (validationResult == null) {
434             return true;
435         }
436         return validationResult.getResult().getValue();
437     }
438
439     public ActionInstanceValidator getActionInstanceValidator(ActionDefinitionId actionDefinitionId) {
440         if (registeredActions == null) {
441             return null;
442         }
443         return registeredActions.get(actionDefinitionId);
444
445     }
446
447 }