Added logic for the PacketcableProvider class to manage failed gate requests as well...
[packetcable.git] / packetcable-policy-server / src / main / java / org / opendaylight / controller / packetcable / provider / PacketcableProvider.java
1 package org.opendaylight.controller.packetcable.provider;
2
3 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
4 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
5 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
6 import org.opendaylight.controller.md.sal.common.api.data.AsyncReadWriteTransaction;
7 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
8 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpPrefix;
9 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Prefix;
10 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.Ccap;
11 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.Qos;
12 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ServiceClassName;
13 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ServiceFlowDirection;
14 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ccap.Ccaps;
15 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ccap.CcapsKey;
16 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.Apps;
17 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.AppsKey;
18 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.Subs;
19 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.SubsKey;
20 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.subs.Gates;
21 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.subs.GatesKey;
22 import org.opendaylight.yangtools.yang.binding.DataObject;
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
24 import org.pcmm.rcd.IPCMMClient;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import javax.annotation.concurrent.ThreadSafe;
29 import java.net.InetAddress;
30 import java.net.UnknownHostException;
31 import java.util.*;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36
37 /**
38  * Called by ODL framework to start this bundle.
39  *
40  * This class is responsible for processing messages received from ODL's restconf interface.
41  * TODO - Remove some of these state maps and move some of this into the PCMMService
42  */
43 @ThreadSafe
44 public class PacketcableProvider implements DataChangeListener, AutoCloseable {
45
46     private static final Logger logger = LoggerFactory.getLogger(PacketcableProvider.class);
47
48     // keys to the /restconf/config/packetcable:ccap and /restconf/config/packetcable:qos config datastore
49     public static final InstanceIdentifier<Ccap> ccapIID = InstanceIdentifier.builder(Ccap.class).build();
50     public static final InstanceIdentifier<Qos> qosIID = InstanceIdentifier.builder(Qos.class).build();
51
52     /**
53      * The ODL object used to broker messages throughout the framework
54      */
55     private final DataBroker dataBroker;
56
57     /**
58      * The thread pool executor
59      */
60     private final ExecutorService executor;
61
62     // TODO - Revisit these maps and remove the ones no longer necessary
63     private final Map<String, Ccaps> ccapMap = new ConcurrentHashMap<>();
64     private final Map<String, Gates> gateMap = new ConcurrentHashMap<>();
65     private final Map<String, String> gateCcapMap = new ConcurrentHashMap<>();
66     private final Map<Subnet, Ccaps> subscriberSubnetsMap = new ConcurrentHashMap<>();
67     private final Map<ServiceClassName, List<Ccaps>> downstreamScnMap = new ConcurrentHashMap<>();
68     private final Map<ServiceClassName, List<Ccaps>> upstreamScnMap = new ConcurrentHashMap<>();
69
70     /**
71      * Holds a PCMMService object for each CCAP being managed.
72      */
73     private final Map<String, PCMMService> pcmmServiceMap = new ConcurrentHashMap<>();
74
75     /**
76      * Constructor
77      */
78     public PacketcableProvider(final DataBroker dataBroker) {
79         logger.info("Starting provider");
80         this.dataBroker = dataBroker;
81         executor = Executors.newCachedThreadPool();
82     }
83
84     /**
85      * Implemented from the AutoCloseable interface.
86      */
87     @Override
88     public void close() throws ExecutionException, InterruptedException {
89         executor.shutdown();
90         if (dataBroker != null) {
91             // remove our config datastore instances
92             final AsyncReadWriteTransaction<InstanceIdentifier<?>, ?> tx = dataBroker.newReadWriteTransaction();
93             tx.delete(LogicalDatastoreType.CONFIGURATION, ccapIID);
94             tx.delete(LogicalDatastoreType.CONFIGURATION, qosIID);
95             // TODO - commit() below has been deprecated
96             tx.commit().get();
97         }
98     }
99
100     public InetAddress getInetAddress(final String subId){
101         try {
102             return InetAddress.getByName(subId);
103         } catch (UnknownHostException e) {
104             logger.error("getInetAddress: {} FAILED: {}", subId, e.getMessage());
105             return null;
106         }
107     }
108
109     private String getIpPrefixStr(final IpPrefix ipPrefix) {
110         final Ipv4Prefix ipv4 = ipPrefix.getIpv4Prefix();
111         if (ipv4 != null) {
112             return ipv4.getValue();
113         } else {
114             return ipPrefix.getIpv6Prefix().getValue();
115         }
116     }
117
118     private void updateCcapMaps(final Ccaps ccap) {
119         // add ccap to the subscriberSubnets map
120         for (final IpPrefix ipPrefix : ccap.getSubscriberSubnets()) {
121             try {
122                 subscriberSubnetsMap.put(Subnet.createInstance(getIpPrefixStr(ipPrefix)), ccap);
123             } catch (UnknownHostException e) {
124                 logger.error("updateSubscriberSubnets: {}:{} FAILED: {}", ipPrefix, ccap, e.getMessage());
125             }
126         }
127         // ccap to upstream SCN map
128         for (final ServiceClassName scn : ccap.getUpstreamScns()) {
129             if (upstreamScnMap.containsKey(scn)) {
130                 upstreamScnMap.get(scn).add(ccap);
131             } else {
132                 final List<Ccaps> ccapList = new ArrayList<>();
133                 ccapList.add(ccap);
134                 upstreamScnMap.put(scn, ccapList);
135             }
136         }
137         // ccap to downstream SCN map
138         for (final ServiceClassName scn : ccap.getDownstreamScns()) {
139             if (downstreamScnMap.containsKey(scn)) {
140                 downstreamScnMap.get(scn).add(ccap);
141             } else {
142                 final List<Ccaps> ccapList = new ArrayList<>();
143                 ccapList.add(ccap);
144                 downstreamScnMap.put(scn, ccapList);
145             }
146         }
147     }
148
149     private void removeCcapFromAllMaps(final Ccaps ccap) {
150         // remove the ccap from all maps
151         // subscriberSubnets map
152         for (final Map.Entry<Subnet, Ccaps> entry : subscriberSubnetsMap.entrySet()) {
153             if (entry.getValue() == ccap) {
154                 subscriberSubnetsMap.remove(entry.getKey());
155             }
156         }
157         // ccap to upstream SCN map
158         for (final Map.Entry<ServiceClassName, List<Ccaps>> entry : upstreamScnMap.entrySet()) {
159             final List<Ccaps> ccapList = entry.getValue();
160             ccapList.remove(ccap);
161             if (ccapList.isEmpty()) {
162                 upstreamScnMap.remove(entry.getKey());
163             }
164         }
165         // ccap to downstream SCN map
166         for (final Map.Entry<ServiceClassName, List<Ccaps>> entry : downstreamScnMap.entrySet()) {
167             final List<Ccaps> ccapList = entry.getValue();
168             ccapList.remove(ccap);
169             if (ccapList.isEmpty()) {
170                 downstreamScnMap.remove(entry.getKey());
171             }
172         }
173
174         final PCMMService service = pcmmServiceMap.remove(ccap.getCcapId());
175         if (service != null) service.disconect();
176     }
177
178     private Ccaps findCcapForSubscriberId(final InetAddress inetAddr) {
179         Ccaps matchedCcap = null;
180         int longestPrefixLen = -1;
181         for (final Map.Entry<Subnet, Ccaps> entry : subscriberSubnetsMap.entrySet()) {
182             final Subnet subnet = entry.getKey();
183             if (subnet.isInNet(inetAddr)) {
184                 int prefixLen = subnet.getPrefixLen();
185                 if (prefixLen > longestPrefixLen) {
186                     matchedCcap = entry.getValue();
187                     longestPrefixLen = prefixLen;
188                 }
189             }
190         }
191         return matchedCcap;
192     }
193
194     private ServiceFlowDirection findScnOnCcap(final ServiceClassName scn, final Ccaps ccap) {
195         if (upstreamScnMap.containsKey(scn)) {
196             final List<Ccaps> ccapList = upstreamScnMap.get(scn);
197             if (ccapList.contains(ccap)) {
198                 return ServiceFlowDirection.Us;
199             }
200         } else if (downstreamScnMap.containsKey(scn)) {
201             final List<Ccaps> ccapList = downstreamScnMap.get(scn);
202             if (ccapList.contains(ccap)) {
203                 return ServiceFlowDirection.Ds;
204             }
205         }
206         return null;
207     }
208
209     /**
210      * Implemented from the DataChangeListener interface.
211      */
212
213     private class InstanceData {
214         // CCAP Identity
215         public final Map<InstanceIdentifier<Ccaps>, Ccaps> ccapIidMap = new HashMap<>();
216         // Gate Identity
217         public String subId;
218         public final Map<String, String> gatePathMap = new HashMap<>();
219         public String gatePath;
220         public final Map<InstanceIdentifier<Gates>, Gates> gateIidMap = new HashMap<>();
221         // remove path for either CCAP or Gates
222         public final Set<String> removePathList = new HashSet<>();
223
224         public InstanceData(final Map<InstanceIdentifier<?>, DataObject> thisData) {
225             // only used to parse createdData or updatedData
226             getCcaps(thisData);
227             if (ccapIidMap.isEmpty()) {
228                 getGates(thisData);
229                 if (! gateIidMap.isEmpty()){
230                     gatePath = gatePathMap.get("appId") + "/" + gatePathMap.get("subId");
231                 }
232             }
233         }
234
235         public InstanceData(final Set<InstanceIdentifier<?>> thisData) {
236             // only used to parse the removedData paths
237             for (final InstanceIdentifier<?> removeThis : thisData) {
238                 getGatePathMap(removeThis);
239                 if (gatePathMap.containsKey("ccapId")) {
240                     gatePath = gatePathMap.get("ccapId");
241                     removePathList.add(gatePath);
242                 } else if (gatePathMap.containsKey("gateId")) {
243                     gatePath = gatePathMap.get("appId") + "/" + gatePathMap.get("subId") + "/" + gatePathMap.get("gateId");
244                     removePathList.add(gatePath);
245                 }
246             }
247         }
248         private void getGatePathMap(final InstanceIdentifier<?> thisInstance) {
249             logger.info("onDataChanged().getGatePathMap(): " + thisInstance);
250             try {
251                 final InstanceIdentifier<Ccaps> ccapInstance = thisInstance.firstIdentifierOf(Ccaps.class);
252                 if (ccapInstance != null) {
253                     final CcapsKey ccapKey = InstanceIdentifier.keyOf(ccapInstance);
254                     if (ccapKey != null) {
255                         gatePathMap.put("ccapId", ccapKey.getCcapId());
256                     }
257                 } else {
258                     // get the gate path keys from the InstanceIdentifier Map key set if they are there
259                     final InstanceIdentifier<Apps> appsInstance = thisInstance.firstIdentifierOf(Apps.class);
260                     if (appsInstance != null) {
261                         final AppsKey appKey = InstanceIdentifier.keyOf(appsInstance);
262                         if (appKey != null) {
263                             gatePathMap.put("appId", appKey.getAppId());
264                         }
265                     }
266                     final InstanceIdentifier<Subs> subsInstance = thisInstance.firstIdentifierOf(Subs.class);
267                     if (subsInstance != null) {
268                         final SubsKey subKey = InstanceIdentifier.keyOf(subsInstance);
269                         if (subKey != null) {
270                             subId = subKey.getSubId();
271                             gatePathMap.put("subId", subId);
272                         }
273                     }
274                     final InstanceIdentifier<Gates> gatesInstance = thisInstance.firstIdentifierOf(Gates.class);
275                     if (gatesInstance != null) {
276                         final GatesKey gateKey = InstanceIdentifier.keyOf(gatesInstance);
277                         if (gateKey != null) {
278                             gatePathMap.put("gateId", gateKey.getGateId());
279                         }
280                     }
281                 }
282             } catch (ClassCastException err) {
283                 logger.warn("Unexpected exception", err);
284             }
285         }
286
287         private void getCcaps(final Map<InstanceIdentifier<?>, DataObject> thisData) {
288             logger.info("onDataChanged().getCcaps(): " + thisData);
289             for (final Map.Entry<InstanceIdentifier<?>, DataObject> entry : thisData.entrySet()) {
290                 if (entry.getValue() instanceof Ccaps) {
291                     // TODO FIXME - Potential ClassCastException thrown here!!!
292                     ccapIidMap.put((InstanceIdentifier<Ccaps>)entry.getKey(), (Ccaps)entry.getValue());
293                 }
294             }
295         }
296
297         private void getGates(final Map<InstanceIdentifier<?>, DataObject> thisData) {
298             logger.info("onDataChanged().getGates(): " + thisData);
299             for (final Map.Entry<InstanceIdentifier<?>, DataObject> entry : thisData.entrySet()) {
300                 if (entry.getValue() instanceof Gates) {
301                     final Gates gate = (Gates)entry.getValue();
302
303                     // TODO FIXME - Potential ClassCastException thrown here!!!
304                     final InstanceIdentifier<Gates> gateIID = (InstanceIdentifier<Gates>)entry.getKey();
305                     getGatePathMap(gateIID);
306                     gateIidMap.put(gateIID, gate);
307                 }
308             }
309         }
310     }
311
312     @Override
313     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
314         logger.info("onDataChanged");
315         // Determine what change action took place by looking at the change object's InstanceIdentifier sets
316         // and validate all instance data
317         if (!change.getCreatedData().isEmpty()) {
318             if (!new ValidateInstanceData(dataBroker, change.getCreatedData()).validateYang()) {
319                 // leave now -- a bad yang object has been detected and a response object has been inserted
320                 return;
321             }
322             onCreate(new InstanceData(change.getCreatedData()));
323         } else if (!change.getRemovedPaths().isEmpty()) {
324             onRemove(new InstanceData(change.getRemovedPaths()));
325         } else if (!change.getUpdatedData().isEmpty()) {
326             if (new ValidateInstanceData(dataBroker, change.getUpdatedData()).isResponseEcho()) {
327                 // leave now -- this is an echo of the inserted response object
328                 return;
329             }
330             onUpdate(new InstanceData(change.getUpdatedData()));
331         } else {
332             // we should not be here -- complain bitterly and return
333             logger.error("onDataChanged(): Unknown change action: " + change);
334         }
335     }
336
337     private void onCreate(final InstanceData thisData) {
338         logger.info("onCreate(): " + thisData);
339
340         // get the CCAP parameters
341         String message;
342         if (! thisData.ccapIidMap.isEmpty()) {
343             for (Map.Entry<InstanceIdentifier<Ccaps>, Ccaps> entry : thisData.ccapIidMap.entrySet()) {
344                 final Ccaps thisCcap = entry.getValue();
345                 // get the CCAP node identity from the Instance Data
346                 final String ccapId = thisCcap.getCcapId();
347
348                 if (pcmmServiceMap.get(thisCcap.getCcapId()) == null) {
349                     final PCMMService pcmmService = new PCMMService(IPCMMClient.CLIENT_TYPE, thisCcap);
350                     // TODO - may want to use the AMID but for the client type but probably not???
351 /*
352                             final PCMMService pcmmService = new PCMMService(
353                                     thisCcap.getAmId().getAmType().shortValue(), thisCcap);
354 */
355                     pcmmServiceMap.put(thisCcap.getCcapId(), pcmmService);
356                     message = pcmmService.addCcap();
357                     if (message.contains("200 OK")) {
358                         ccapMap.put(ccapId, thisCcap);
359                         updateCcapMaps(thisCcap);
360                         logger.info("Created CCAP: {}/{} : {}", thisData.gatePath, thisCcap, message);
361                         logger.info("Created CCAP: {} : {}", thisData.gatePath, message);
362                     } else {
363                         // TODO - when a connection cannot be made, need to remove CCAP from ODL cache.
364                         logger.error("Create CCAP Failed: {} : {}", thisData.gatePath, message);
365                     }
366                     // set the response string in the config ccap object using a new thread
367                     executor.execute(new Response(dataBroker, entry.getKey(), thisCcap, message));
368                 } else {
369                     logger.error("Already monitoring CCAP - " + thisCcap);
370                     break;
371                 }
372             }
373         } else {
374             // get the PCMM gate parameters from the ccapId/appId/subId/gateId path in the Maps entry (if new gate)
375             for (final Map.Entry<InstanceIdentifier<Gates>, Gates> entry : thisData.gateIidMap.entrySet()) {
376                 message = null;
377                 final Gates gate = entry.getValue();
378                 final String gateId = gate.getGateId();
379                 final String gatePathStr = thisData.gatePath + "/" + gateId ;
380                 final InetAddress subId = getInetAddress(thisData.subId);
381                 if (subId != null) {
382                     final Ccaps thisCcap = findCcapForSubscriberId(subId);
383                     if (thisCcap != null) {
384                         final String ccapId = thisCcap.getCcapId();
385                         // verify SCN exists on CCAP and force gateSpec.Direction to align with SCN direction
386                         final ServiceClassName scn = gate.getTrafficProfile().getServiceClassName();
387                         if (scn != null) {
388                             final ServiceFlowDirection scnDir = findScnOnCcap(scn, thisCcap);
389                             if (scnDir != null) {
390                                 if (pcmmServiceMap.get(thisCcap.getCcapId()) != null) {
391                                     message = pcmmServiceMap.get(thisCcap.getCcapId()).sendGateSet(gatePathStr, subId, gate, scnDir);
392                                     gateMap.put(gatePathStr, gate);
393                                     gateCcapMap.put(gatePathStr, thisCcap.getCcapId());
394
395                                     if (message.contains("200 OK")) {
396                                         logger.info("Created QoS gate {} for {}/{}/{} - {}",
397                                                 gateId, ccapId, gatePathStr, gate, message);
398                                         logger.info("Created QoS gate {} for {}/{} - {}",
399                                                 gateId, ccapId, gatePathStr, message);
400                                     } else {
401                                         logger.info("Unable to create QoS gate {} for {}/{}/{} - {}",
402                                                 gateId, ccapId, gatePathStr, gate, message);
403                                         logger.error("Unable to create QoS gate {} for {}/{} - {}",
404                                                 gateId, ccapId, gatePathStr, message);
405                                     }
406                                 } else {
407                                     logger.error("Unable to locate PCMM Service for CCAP - " + thisCcap);
408                                     break;
409                                 }
410                             } else {
411                                 logger.error("PCMMService: sendGateSet(): SCN {} not found on CCAP {} for {}/{}",
412                                         scn.getValue(), thisCcap, gatePathStr, gate);
413                                 message = String.format("404 Not Found - SCN %s not found on CCAP %s for %s",
414                                         scn.getValue(), thisCcap.getCcapId(), gatePathStr);
415                             }
416                         }
417                     } else {
418                         final String subIdStr = thisData.subId;
419                         message = String.format("404 Not Found - no CCAP found for subscriber %s in %s",
420                                 subIdStr, gatePathStr);
421                         logger.info("Create QoS gate {} FAILED: no CCAP found for subscriber {}: @ {}/{}",
422                                 gateId, subIdStr, gatePathStr, gate);
423                         logger.error("Create QoS gate {} FAILED: no CCAP found for subscriber {}: @ {}",
424                                 gateId, subIdStr, gatePathStr);
425                     }
426                 } else {
427                     final String subIdStr = thisData.subId;
428                     message = String.format("400 Bad Request - subId must be a valid IP address for subscriber %s in %s",
429                             subIdStr, gatePathStr);
430                     logger.info("Create QoS gate {} FAILED: subId must be a valid IP address for subscriber {}: @ {}/{}",
431                             gateId, subIdStr, gatePathStr, gate);
432                     logger.error("Create QoS gate {} FAILED: subId must be a valid IP address for subscriber {}: @ {}",
433                             gateId, subIdStr, gatePathStr);
434                 }
435                 // set the response message in the config gate object using a new thread
436                 executor.execute(new Response(dataBroker, entry.getKey(), gate, message));
437             }
438         }
439     }
440
441     private void onRemove(final InstanceData thisData) {
442         logger.info("onRemove(): " + thisData);
443         for (final String gatePathStr: thisData.removePathList) {
444             if (gateMap.containsKey(gatePathStr)) {
445                 final Gates thisGate = gateMap.remove(gatePathStr);
446                 final String gateId = thisGate.getGateId();
447                 final String ccapId = gateCcapMap.remove(gatePathStr);
448                 final Ccaps thisCcap = ccapMap.get(ccapId);
449                 final PCMMService service = pcmmServiceMap.get(thisCcap.getCcapId());
450                 if (service != null) {
451                     service.sendGateDelete(gatePathStr);
452                     logger.info("onDataChanged(): removed QoS gate {} for {}/{}/{}: ", gateId, ccapId, gatePathStr, thisGate);
453                     logger.info("onDataChanged(): removed QoS gate {} for {}/{}: ", gateId, ccapId, gatePathStr);
454                 } else
455                     logger.warn("Unable to send to locate PCMMService to send gate delete message with CCAP - "
456                             + thisCcap);
457             }
458         }
459         for (final String ccapIdStr: thisData.removePathList) {
460             if (ccapMap.containsKey(ccapIdStr)) {
461                 final Ccaps thisCcap = ccapMap.remove(ccapIdStr);
462                 removeCcapFromAllMaps(thisCcap);
463             }
464         }
465     }
466
467     private void onUpdate(final InstanceData oldData) {
468         logger.info("onUpdate(): " + oldData);
469         // update operation not allowed -- restore the original config object and complain
470         if (! oldData.ccapIidMap.isEmpty()) {
471             for (final Map.Entry<InstanceIdentifier<Ccaps>, Ccaps> entry : oldData.ccapIidMap.entrySet()) {
472                 final Ccaps ccap = entry.getValue();
473                 final String ccapId = ccap.getCcapId();
474                 String message = String.format("405 Method Not Allowed - %s: CCAP update not permitted (use delete); ",
475                         ccapId);
476                 // push new error message onto existing response
477                 message += ccap.getResponse();
478                 // set the response message in the config object using a new thread -- also restores the original data
479                 executor.execute(new Response(dataBroker, entry.getKey(), ccap, message));
480                 logger.error("onDataChanged(): CCAP update not permitted {}/{}", ccapId, ccap);
481             }
482         } else {
483             for (final Map.Entry<InstanceIdentifier<Gates>, Gates> entry : oldData.gateIidMap.entrySet()) {
484                 final Gates gate = entry.getValue();
485                 final String gatePathStr = oldData.gatePath + "/" + gate.getGateId() ;
486                 String message = String.format("405 Method Not Allowed - %s: QoS Gate update not permitted (use delete); ", gatePathStr);
487                 // push new error message onto existing response
488                 message += gate.getResponse();
489                 // set the response message in the config object using a new thread -- also restores the original data
490                 executor.execute(new Response(dataBroker, entry.getKey(), gate, message));
491                 logger.error("onDataChanged(): QoS Gate update not permitted: {}/{}", gatePathStr, gate);
492             }
493         }
494     }
495
496 }