Convert DataChangeListeners to DataTreeChangeListeners
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / SfcManager.java
1 /*
2  * Copyright (c) 2015 Intel, Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.Collection;
16 import java.util.HashSet;
17 import java.util.List;
18 import java.util.Set;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
25 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
27 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
28 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
31 import org.opendaylight.groupbasedpolicy.api.sf.ChainActionDefinition;
32 import org.opendaylight.groupbasedpolicy.util.IetfModelCodec;
33 import org.opendaylight.sfc.provider.SfcProviderRpc;
34 import org.opendaylight.sfc.provider.api.SfcProviderServiceChainAPI;
35 import org.opendaylight.sfc.provider.api.SfcProviderServicePathAPI;
36 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.sfc.common.rev151017.SfcName;
37 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.sfc.rsp.rev140701.ReadRenderedServicePathFirstHopInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.sfc.rsp.rev140701.ReadRenderedServicePathFirstHopOutput;
39 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.sfc.rsp.rev140701.rendered.service.path.first.hop.info.RenderedServicePathFirstHop;
40 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.sfc.sfc.rev140701.service.function.chain.grouping.ServiceFunctionChain;
41 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.sfc.sfp.rev140701.ServiceFunctionPaths;
42 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.sfc.sfp.rev140701.service.function.paths.ServiceFunctionPath;
43 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.Tenants;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.subject.feature.definitions.ActionDefinition;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.subject.feature.definitions.ActionDefinitionKey;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.subject.feature.instance.ParameterValue;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.Tenant;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.Policy;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.policy.SubjectFeatureInstances;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.tenants.tenant.policy.subject.feature.instances.ActionInstance;
54 import org.opendaylight.yangtools.concepts.ListenerRegistration;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.opendaylight.yangtools.yang.common.RpcResult;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 /**
61  * Manage the state exchanged with SFC
62  *
63  * For the Proof of Concept, this manages the
64  * RenderedServicePathFirstHop elements that
65  * are retrieved from SFC.
66  *
67  */
68 public class SfcManager implements AutoCloseable, DataTreeChangeListener<ActionInstance> {
69     private static final Logger LOG =
70             LoggerFactory.getLogger(SfcManager.class);
71
72     private final DataBroker dataBroker;
73     private final ExecutorService executor;
74     private final InstanceIdentifier<ActionInstance> allActionInstancesIid;
75     private final ListenerRegistration<?> actionListener;
76
77     /*
78      * local cache of the RSP first hops that we've requested from SFC,
79      * keyed by RSP name
80      */
81     private final ConcurrentMap<String, RenderedServicePathFirstHop> rspMap;
82
83     /*
84      *  TODO: these two String defs should move to the common
85      *        "chain" action, once we have it.
86      */
87     // the chain action
88     public static final String SFC_CHAIN_ACTION = "chain";
89     // the parameter used for storing the chain name
90     public static final String SFC_CHAIN_NAME = "sfc-chain-name";
91
92     private static enum ActionState {
93         ADD("add"),
94         CHANGE("change"),
95         DELETE("delete");
96
97         private String state;
98
99         ActionState(String state) {
100             this.state = state;
101         }
102
103         @Override
104         public String toString() {
105             return this.state;
106         }
107     }
108
109
110     public SfcManager(DataBroker dataBroker,
111                       RpcProviderRegistry rpcRegistry,
112                       ExecutorService executor) {
113         this.dataBroker = dataBroker;
114         this.executor = executor;
115         /*
116          * Use thread-safe type only because we use an executor
117          */
118         this.rspMap = new ConcurrentHashMap<>();
119
120         /*
121          * For now, listen to all changes in rules
122          */
123         allActionInstancesIid =
124                 InstanceIdentifier.builder(Tenants.class)
125                     .child(Tenant.class)
126                     .child(Policy.class)
127                     .child(SubjectFeatureInstances.class)
128                     .child(ActionInstance.class)
129                     .build();
130         actionListener = dataBroker.registerDataTreeChangeListener(new DataTreeIdentifier<>(
131                 LogicalDatastoreType.CONFIGURATION, allActionInstancesIid), this);
132         LOG.debug("SfcManager: Started");
133     }
134
135     public Set<IpAddress> getSfcSourceIps() {
136         if (rspMap.isEmpty()) {
137             return null;
138         }
139
140         Set<IpAddress> ipAddresses = new HashSet<>();
141         for (RenderedServicePathFirstHop rsp: rspMap.values()) {
142             if (rsp.getIp() != null) {
143                 ipAddresses.add(IetfModelCodec.ipAddress2010(rsp.getIp()));
144             }
145         }
146         if (ipAddresses.isEmpty()) {
147             return null;
148         }
149         return ipAddresses;
150     }
151
152     @Override
153     public void onDataTreeChanged(Collection<DataTreeModification<ActionInstance>> changes) {
154         for (DataTreeModification<ActionInstance> change: changes) {
155             DataObjectModification<ActionInstance> rootNode = change.getRootNode();
156             final ActionInstance dataBefore = rootNode.getDataBefore();
157             final ActionInstance dataAfter = rootNode.getDataAfter();
158             switch (rootNode.getModificationType()) {
159                 case SUBTREE_MODIFIED:
160                 case WRITE:
161                     if (dataBefore == null) {
162                         LOG.debug("New ActionInstance created");
163                         executor.execute(new MatchActionDefTask(dataAfter, null, ActionState.ADD));
164                     } else {
165                         /*
166                          We may have some cleanup here.  If the reference to
167                          the Action Definition changed, or if the Action Instance's
168                          chain parameter  then we're no longer
169                          an action, and we may need to remove the RSP.
170                         */
171                         LOG.debug("ActionInstance updated");
172                         executor.execute(new MatchActionDefTask(dataAfter, dataBefore, ActionState.CHANGE));
173                     }
174                     break;
175                 case DELETE:
176                     LOG.debug("ActionInstance deleted");
177                     executor.execute(new MatchActionDefTask(null, dataBefore, ActionState.DELETE));
178                     break;
179                 default:
180                     break;
181             }
182         }
183     }
184
185     /**
186      * Private internal class that gets the action definition
187      * referenced by the instance. If the definition has an
188      * action of "chain" (or whatever we decide to use
189      * here), then we need to invoke the SFC API to go
190      * get the chain information, which we'll eventually
191      * use during policy resolution.
192      *
193      */
194     private class MatchActionDefTask implements Runnable,
195                      FutureCallback<Optional<ActionDefinition>> {
196         private final ActionState state;
197         private final ActionInstance actionInstance;
198         private final ActionInstance originalInstance;
199         private final InstanceIdentifier<ActionDefinition> adIid;
200         private final ActionDefinitionId id;
201
202         public MatchActionDefTask(ActionInstance actionInstance,
203                 ActionInstance originalInstance, ActionState state) {
204             this.actionInstance = actionInstance;
205             this.originalInstance = originalInstance;
206             if (actionInstance != null) {
207                 this.id = actionInstance.getActionDefinitionId();
208             } else {
209                 this.id = null;
210             }
211             this.state = state;
212
213             adIid = InstanceIdentifier.builder(SubjectFeatureDefinitions.class)
214                                       .child(ActionDefinition.class,
215                                              new ActionDefinitionKey(this.id))
216                                       .build();
217
218         }
219
220         /**
221          * Create read transaction with callback to look up
222          * the Action Definition that the Action Instance
223          * references.
224          */
225         @Override
226         public void run() {
227             ReadOnlyTransaction rot = dataBroker.newReadOnlyTransaction();
228             ListenableFuture<Optional<ActionDefinition>> dao =
229                     rot.read(LogicalDatastoreType.OPERATIONAL, adIid);
230             Futures.addCallback(dao, this, executor);
231
232         }
233
234         @Override
235         public void onFailure(Throwable arg0) {
236             LOG.error("Failure reading ActionDefinition {}", id.getValue());
237         }
238
239         /**
240          * An Action Definition exists - now we need to see
241          * if the Action Definition is for a chain action,
242          * and implement the appropriate behavior. If it's
243          * not a chain action, then we can ignore it.
244          *
245          * @param dao
246          */
247         @Override
248         public void onSuccess(Optional<ActionDefinition> dao) {
249             LOG.debug("Found ActionDefinition {}", id.getValue());
250             if (!dao.isPresent()) {
251                 return;
252             }
253
254             ActionDefinition ad = dao.get();
255             if (ad.getId().getValue().equals(ChainActionDefinition.ID.getValue())) {
256                 /*
257                  * We have the state we need:
258                  *  1) it's a "CHAIN" action
259                  *  2) the name is defined in the ActionInstance
260                  */
261                 switch (state) {
262                 case ADD:
263                     /*
264                      * Go get the RSP First Hop
265                      */
266                     getSfcChain();
267                     break;
268                 case CHANGE:
269                     /*
270                      * We only care if the named chain changes
271                      */
272                     changeSfcRsp();
273                     break;
274                 case DELETE:
275                     /*
276                      * If the instance is deleted, we need to remove
277                      * it from our map.
278                      */
279                     deleteSfcRsp();
280                     break;
281                 default:
282                     break;
283                 }
284             }
285         }
286
287         private ParameterValue getChainNameParameter(List<ParameterValue> pvl) {
288             if (pvl == null) {
289                 return null;
290             }
291             for (ParameterValue pv: actionInstance.getParameterValue()) {
292                 if (pv.getName().getValue().equals(SFC_CHAIN_NAME)) {
293                     return pv;
294                 }
295             }
296             return null;
297         }
298
299         private void changeSfcRsp() {
300             ParameterValue newPv =
301                     getChainNameParameter(actionInstance.getParameterValue());
302             ParameterValue origPv =
303                     getChainNameParameter(originalInstance.getParameterValue());
304             if (!newPv.getStringValue().equals(origPv.getStringValue())) {
305                 if (rspMap.containsKey(origPv.getStringValue())) {
306                     /*
307                      * Flow cleanup will happen as part of the
308                      * resolved policy
309                      *
310                      * TODO: can we guarantee that this
311                      *       happens after we remove the RSP?).
312                      */
313                     rspMap.remove(origPv.getStringValue());
314                 }
315                 addSfcRsp();
316             }
317         }
318
319         private void deleteSfcRsp() {
320             ParameterValue pv =
321                     getChainNameParameter(originalInstance.getParameterValue());
322             if (pv == null) {
323                 return;
324             }
325             rspMap.remove(pv.getStringValue());
326         }
327
328         /**
329          * Get the RenderedServicePathFirstHop from SFC
330          *
331          * TODO: what if SFC state isn't available at the time of
332          *       this call, but becomes available later?  Do we want
333          *       or need some sort of notification handler for this?
334          */
335         private void addSfcRsp() {
336             ParameterValue pv =
337                     getChainNameParameter(actionInstance.getParameterValue());
338             if (pv == null) {
339                 return;
340             }
341
342             LOG.trace("Invoking RPC for chain {}", pv.getStringValue());
343             ReadRenderedServicePathFirstHopInputBuilder builder =
344                 new ReadRenderedServicePathFirstHopInputBuilder()
345                        .setName(pv.getStringValue());
346             // TODO: make async
347             Future<RpcResult<ReadRenderedServicePathFirstHopOutput>> result =
348                 SfcProviderRpc.getSfcProviderRpc()
349                               .readRenderedServicePathFirstHop(builder.build());
350
351             try {
352                 RpcResult<ReadRenderedServicePathFirstHopOutput> output =
353                         result.get();
354                 if (output.isSuccessful()) {
355                     LOG.trace("RPC for chain {} succeeded!", pv.getStringValue());
356                     RenderedServicePathFirstHop rspFirstHop =
357                         output.getResult().getRenderedServicePathFirstHop();
358                     /*
359                      * We won't retry installation in the map
360                      * because the presumption is it's either
361                      * the same object or contain the same
362                      * state.
363                      */
364                     rspMap.putIfAbsent(pv.getStringValue(), rspFirstHop);
365                 }
366             } catch (Exception e) {
367                 LOG.warn("Failed ReadRenderedServicePathFirstHop RPC: {}", e);
368                 // TODO: proper exception handling
369             }
370         }
371
372         private void getSfcChain() {
373             ParameterValue pv =
374                     getChainNameParameter(actionInstance.getParameterValue());
375             if (pv == null) {
376                 return;
377             }
378
379             LOG.trace("Invoking RPC for chain {}", pv.getStringValue());
380             SfcName chainName=new SfcName(pv.getStringValue());
381             ServiceFunctionChain chain = SfcProviderServiceChainAPI.readServiceFunctionChain(chainName);
382             ServiceFunctionPaths paths = SfcProviderServicePathAPI.readAllServiceFunctionPaths();
383             for(ServiceFunctionPath path: paths.getServiceFunctionPath()) {
384                 if(path.getServiceChainName().equals(chainName)) {
385                     LOG.info("Found path {} for chain {}",path.getName(),path.getServiceChainName());
386                 }
387             }
388         }
389     }
390
391     /**
392      * Return the first hop information for the Rendered Service Path
393      *
394      * @param rspName the Rendered Service Path
395      * @return the first hop information for the Rendered Service Path
396      */
397     public RenderedServicePathFirstHop getRspFirstHop(String rspName) {
398         return rspMap.get(rspName);
399     }
400
401     @Override
402     public void close() throws Exception {
403         if (actionListener != null) {
404             actionListener.close();
405         }
406
407     }
408 }
409