Implement SFC integration
[groupbasedpolicy.git] / groupbasedpolicy / src / main / java / org / opendaylight / groupbasedpolicy / resolver / PolicyResolver.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.resolver;
10
11 import java.util.Collection;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.CopyOnWriteArrayList;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.atomic.AtomicReference;
22
23 import javax.annotation.concurrent.Immutable;
24
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
27 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.has.action.refs.ActionRef;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.Tenant;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.Contract;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.contract.Subject;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.contract.subject.Rule;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.subject.feature.instances.ActionInstance;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.yang.binding.DataObject;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import com.google.common.base.Optional;
47 import com.google.common.base.Predicate;
48 import com.google.common.collect.Sets;
49 import com.google.common.collect.Table;
50 import com.google.common.collect.Table.Cell;
51 import com.google.common.util.concurrent.FutureCallback;
52 import com.google.common.util.concurrent.Futures;
53 import com.google.common.util.concurrent.ListenableFuture;
54
55 /**
56  * The policy resolver is a utility for renderers to help in resolving
57  * group-based policy into a form that is easier to apply to the actual network.
58  *
59  * For any pair of endpoint groups, there is a set of rules that could apply to
60  * the endpoints on that group based on the policy configuration. The exact list
61  * of rules that apply to a given pair of endpoints depends on the conditions
62  * that are active on the endpoints.
63  *
64  * We need to be able to query against this policy model, enumerate the relevant
65  * classes of traffic and endpoints, and notify renderers when there are changes
66  * to policy as it applies to active sets of endpoints and endpoint groups.
67  *
68  * The policy resolver will maintain the necessary state for all tenants in its
69  * control domain, which is the set of tenants for which policy listeners have
70  * been registered.
71  *
72  */
73 public class PolicyResolver implements AutoCloseable {
74     private static final Logger LOG = LoggerFactory.getLogger(PolicyResolver.class);
75
76     private final DataBroker dataProvider;
77     private final ScheduledExecutorService executor;
78
79     /**
80      * Keep track of the current relevant policy scopes.
81      */
82     protected CopyOnWriteArrayList<PolicyScope> policyListenerScopes;
83
84     protected ConcurrentMap<TenantId, TenantContext> resolvedTenants;
85
86
87     /**
88      * Store a policy object for each endpoint group pair. The table is stored
89      * with the key as (consumer, provider). Two endpoints could appear in both
90      * roles at the same time, in which case both policies would apply.
91      */
92     AtomicReference<PolicyInfo> policy = new AtomicReference<>();
93
94     /*
95      * Store validators for ActionDefinitions from Renderers
96      *
97      */
98
99     protected ConcurrentMap<ActionDefinitionId, ActionInstanceValidator> registeredActions = new ConcurrentHashMap<>();
100
101     public PolicyResolver(DataBroker dataProvider,
102             ScheduledExecutorService executor) {
103         super();
104         this.dataProvider = dataProvider;
105         this.executor = executor;
106         policyListenerScopes = new CopyOnWriteArrayList<>();
107         resolvedTenants = new ConcurrentHashMap<>();
108         LOG.debug("Initialized renderer common policy resolver");
109     }
110
111     // *************
112     // AutoCloseable
113     // *************
114
115     @Override
116     public void close() throws Exception {
117         for (TenantContext ctx : resolvedTenants.values()) {
118             if (ctx.registration != null)
119                 ctx.registration.close();
120         }
121     }
122
123     // *************************
124     // PolicyResolver public API
125     // *************************
126
127     /**
128      * Get a snapshot of the current policy
129      *
130      * @return the {@link PolicyInfo} object representing an immutable snapshot
131      *         of the policy state
132      */
133     public PolicyInfo getCurrentPolicy() {
134         return policy.get();
135     }
136
137     /**
138      * Get the normalized tenant for the given ID
139      *
140      * @param tenant
141      *            the tenant ID
142      * @return the {@link Tenant}
143      */
144     public IndexedTenant getTenant(TenantId tenant) {
145         TenantContext tc = resolvedTenants.get(tenant);
146         if (tc == null)
147             return null;
148         return tc.tenant.get();
149     }
150
151     public void registerActionDefinitions(ActionDefinitionId actionDefinitionId, ActionInstanceValidator validator) {
152         registeredActions.putIfAbsent(actionDefinitionId, validator);
153     }
154     /**
155      * Register a listener to receive update events.
156      *
157      * @param listener
158      *            the {@link PolicyListener} object to receive the update events
159      */
160     public PolicyScope registerListener(PolicyListener listener) {
161         PolicyScope ps = new PolicyScope(this, listener);
162         policyListenerScopes.add(ps);
163
164         return ps;
165     }
166
167     /**
168      * Remove the listener registered for the given {@link PolicyScope}.
169      *
170      * @param scope
171      *            the scope to remove
172      * @see PolicyResolver#registerListener(PolicyListener)
173      */
174     public void removeListener(PolicyScope scope) {
175         policyListenerScopes.remove(scope);
176     }
177
178     // **************
179     // Implementation
180     // **************
181
182     /**
183      * Atomically update the active policy and notify policy listeners of
184      * relevant changes
185      *
186      * @param policyMap
187      *            the new policy to set
188      * @param egConditions
189      *            the map of endpoint groups to relevant condition sets
190      * @return the set of groups with updated policy
191      */
192     protected Set<EgKey> updatePolicy(Table<EgKey, EgKey, Policy> policyMap,
193             Map<EgKey, Set<ConditionSet>> egConditions,
194             List<PolicyScope> policyListenerScopes) {
195         PolicyInfo newPolicy = new PolicyInfo(policyMap, egConditions);
196         PolicyInfo oldPolicy = policy.getAndSet(newPolicy);
197
198         HashSet<EgKey> notifySet = new HashSet<>();
199
200         for (Cell<EgKey, EgKey, Policy> cell : newPolicy.getPolicyMap().cellSet()) {
201             Policy newp = cell.getValue();
202             Policy oldp = null;
203             if (oldPolicy != null)
204                 oldp = oldPolicy.getPolicyMap().get(cell.getRowKey(),
205                         cell.getColumnKey());
206             if (oldp == null || !newp.equals(oldp)) {
207                 notifySet.add(cell.getRowKey());
208                 notifySet.add(cell.getColumnKey());
209             }
210         }
211         if (oldPolicy != null) {
212             for (Cell<EgKey, EgKey, Policy> cell : oldPolicy.getPolicyMap().cellSet()) {
213                 if (!newPolicy.getPolicyMap().contains(cell.getRowKey(),
214                         cell.getColumnKey())) {
215                     notifySet.add(cell.getRowKey());
216                     notifySet.add(cell.getColumnKey());
217                 }
218             }
219         }
220         return notifySet;
221     }
222
223     /**
224      * Notify the policy listeners about a set of updated groups
225      */
226     private void notifyListeners(Set<EgKey> updatedGroups) {
227         for (final PolicyScope scope : policyListenerScopes) {
228             Set<EgKey> filtered =
229                     Sets.filter(updatedGroups, new Predicate<EgKey>() {
230                         @Override
231                         public boolean apply(EgKey input) {
232                             return scope.contains(input.getTenantId(),
233                                     input.getEgId());
234                         }
235                     });
236             if (!filtered.isEmpty()) {
237                 scope.getListener().policyUpdated(filtered);
238             }
239         }
240     }
241
242     /**
243      * Subscribe the resolver to updates related to a particular tenant Make
244      * sure that this can't be called concurrently with subscribe
245      *
246      * @param tenantId
247      *            the tenant ID to subscribe to
248      */
249     protected void subscribeTenant(TenantId tenantId) {
250         if (!resolvedTenants.containsKey(tenantId))
251             updateTenant(tenantId);
252     }
253
254     /**
255      * Unsubscribe the resolver from updates related to a particular tenant Make
256      * sure that this can't be called concurrently with subscribe
257      *
258      * @param tenantId
259      *            the tenant ID to subscribe to
260      */
261     protected void unsubscribeTenant(TenantId tenantId) {
262         TenantContext context = resolvedTenants.get(tenantId);
263         if (context != null) {
264             resolvedTenants.remove(tenantId);
265             context.registration.close();
266         }
267     }
268
269     private void updateTenant(final TenantId tenantId) {
270         if (dataProvider == null)
271             return;
272
273         TenantContext context = resolvedTenants.get(tenantId);
274         if (context == null) {
275             ListenerRegistration<DataChangeListener> registration = null;
276             registration = dataProvider
277                     .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
278                             TenantUtils.tenantIid(tenantId),
279                             new PolicyChangeListener(tenantId),
280                             DataChangeScope.SUBTREE);
281
282             context = new TenantContext(registration);
283             TenantContext oldContext =
284                     resolvedTenants.putIfAbsent(tenantId, context);
285             if (oldContext != null) {
286                 // already registered in a different thread; just use the other
287                 // context
288                 registration.close();
289                 context = oldContext;
290             } else {
291                 LOG.info("Added tenant {} to policy scope", tenantId);
292             }
293         }
294
295         // Resolve the new tenant and update atomically
296         final AtomicReference<IndexedTenant> tenantRef = context.tenant;
297         final IndexedTenant ot = tenantRef.get();
298         ReadOnlyTransaction transaction =
299                 dataProvider.newReadOnlyTransaction();
300         final InstanceIdentifier<Tenant> tiid = TenantUtils.tenantIid(tenantId);
301         ListenableFuture<Optional<Tenant>> unresolved;
302
303         unresolved = transaction.read(LogicalDatastoreType.CONFIGURATION, tiid);
304
305         Futures.addCallback(unresolved, new FutureCallback<Optional<Tenant>>() {
306             @Override
307             public void onSuccess(Optional<Tenant> result) {
308                 if (!result.isPresent()) {
309                     LOG.warn("Tenant {} not found", tenantId);
310                 }
311
312                 Tenant t = InheritanceUtils.resolveTenant(result.get());
313                 if (isValidTenant(t)) {
314                     IndexedTenant it = new IndexedTenant(t);
315                     if (!tenantRef.compareAndSet(ot, it)) {
316                         // concurrent update of tenant policy. Retry
317                         updateTenant(tenantId);
318                     } else {
319                         // Update the policy cache and notify listeners
320                         WriteTransaction wt = dataProvider.newWriteOnlyTransaction();
321                         wt.put(LogicalDatastoreType.OPERATIONAL, tiid, t, true);
322                         wt.submit();
323                         updatePolicy();
324                     }
325                 }
326             }
327
328
329
330
331
332             @Override
333             public void onFailure(Throwable t) {
334                 LOG.error("Count not get tenant {}", tenantId, t);
335             }
336         }, executor);
337     }
338
339     protected void updatePolicy() {
340         try {
341             Map<EgKey, Set<ConditionSet>> egConditions = new HashMap<>();
342             Set<IndexedTenant> indexedTenants = getIndexedTenants(resolvedTenants.values());
343             Table<EgKey, EgKey, Policy> policyMap = PolicyResolverUtils.resolvePolicy(indexedTenants, egConditions);
344             Set<EgKey> updatedGroups = updatePolicy(policyMap, egConditions, policyListenerScopes);
345
346             notifyListeners(updatedGroups);
347         } catch (Exception e) {
348             LOG.error("Failed to update policy", e);
349         }
350     }
351
352     private Set<IndexedTenant> getIndexedTenants(Collection<TenantContext> tenantCtxs) {
353         Set<IndexedTenant> result = new HashSet<>();
354         for (TenantContext tenant : tenantCtxs) {
355             IndexedTenant t = tenant.tenant.get();
356             if (t != null)
357                 result.add(t);
358         }
359         return result;
360     }
361
362     protected static class TenantContext {
363         ListenerRegistration<DataChangeListener> registration;
364
365         AtomicReference<IndexedTenant> tenant = new AtomicReference<>();
366
367         public TenantContext(ListenerRegistration<DataChangeListener> registration) {
368             super();
369             this.registration = registration;
370         }
371     }
372
373     @Immutable
374     private class PolicyChangeListener implements DataChangeListener {
375         final TenantId tenantId;
376
377         public PolicyChangeListener(TenantId tenantId) {
378             super();
379             this.tenantId = tenantId;
380         }
381
382         @Override
383         public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> arg0) {
384             updateTenant(tenantId);
385         }
386
387     }
388
389     private boolean isValidTenant(Tenant t) {
390         if(validActionInstances(t.getSubjectFeatureInstances().getActionInstance())) {
391             return true;
392         }
393         return false;
394     }
395
396     private boolean validActionInstances(List<ActionInstance> actionInstances) {
397         for(ActionInstance actionInstance : actionInstances) {
398             if(!(registeredActions.get(actionInstance.getActionDefinitionId()).isValid(actionInstance))) {
399                 return false;
400             };
401         }
402         return true;
403     }
404
405     private boolean validContracts(List<Contract> contracts) {
406         for (Contract contract: contracts) {
407             validateSubjects(contract.getSubject());
408         }
409         return false;
410     }
411
412     private void validateSubjects(List<Subject> subjects) {
413         for(Subject subject: subjects) {
414             validateRules(subject.getRule());
415         }
416
417     }
418
419     private void validateRules(List<Rule> rules) {
420
421     }
422
423     private void validateActionRefs(List<ActionRef> actionRefs) {
424
425     }
426 }