1 package org.opendaylight.controller.packetcable.provider;
3 import java.net.InetAddress;
4 import java.net.UnknownHostException;
5 import java.util.ArrayList;
6 import java.util.HashMap;
7 import java.util.HashSet;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
16 import javax.annotation.concurrent.ThreadSafe;
18 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
19 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
23 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
24 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpPrefix;
25 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Prefix;
26 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.Ccap;
27 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.Qos;
28 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ServiceClassName;
29 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ServiceFlowDirection;
30 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ccap.Ccaps;
31 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ccap.CcapsKey;
32 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.Apps;
33 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.AppsKey;
34 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.Subs;
35 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.SubsKey;
36 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.subs.Gates;
37 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.subs.GatesKey;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.binding.DataObject;
40 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
41 import org.osgi.framework.BundleContext;
42 import org.osgi.framework.FrameworkUtil;
43 import org.osgi.framework.ServiceRegistration;
44 import org.pcmm.rcd.IPCMMClient;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * Called by ODL framework to start this bundle.
51 * This class is responsible for processing messages received from ODL's restconf interface.
52 * TODO - Remove some of these state maps and move some of this into the PCMMService
55 public class PacketcableProvider implements BindingAwareProvider, DataChangeListener, AutoCloseable {
57 private static final Logger logger = LoggerFactory.getLogger(PacketcableProvider.class);
59 // keys to the /restconf/config/packetcable:ccap and /restconf/config/packetcable:qos config datastore
60 public static final InstanceIdentifier<Ccap> ccapIID = InstanceIdentifier.builder(Ccap.class).build();
61 public static final InstanceIdentifier<Qos> qosIID = InstanceIdentifier.builder(Qos.class).build();
64 * The ODL object used to broker messages throughout the framework
66 private DataBroker dataBroker;
68 private ServiceRegistration<PacketcableProvider> packetcableProviderRegistration;
70 private ListenerRegistration<DataChangeListener> ccapDataChangeListenerRegistration;
71 private ListenerRegistration<DataChangeListener> qosDataChangeListenerRegistration;
74 * The thread pool executor
76 private final ExecutorService executor;
78 // TODO - Revisit these maps and remove the ones no longer necessary
79 private final Map<String, Ccaps> ccapMap = new ConcurrentHashMap<>();
80 private final Map<String, Gates> gateMap = new ConcurrentHashMap<>();
81 private final Map<String, String> gateCcapMap = new ConcurrentHashMap<>();
82 private final Map<Subnet, Ccaps> subscriberSubnetsMap = new ConcurrentHashMap<>();
83 private final Map<ServiceClassName, List<Ccaps>> downstreamScnMap = new ConcurrentHashMap<>();
84 private final Map<ServiceClassName, List<Ccaps>> upstreamScnMap = new ConcurrentHashMap<>();
87 * Holds a PCMMService object for each CCAP being managed.
89 private final Map<String, PCMMService> pcmmServiceMap = new ConcurrentHashMap<>();
94 public PacketcableProvider() {
95 logger.info("Starting provider");
96 executor = Executors.newCachedThreadPool();
100 public void onSessionInitiated(ProviderContext session) {
101 logger.info("Packetcable Session Initiated");
103 dataBroker = session.getSALService(DataBroker.class);
105 BundleContext context = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
106 packetcableProviderRegistration = context.registerService(PacketcableProvider.class, this, null);
108 ccapDataChangeListenerRegistration =
109 dataBroker.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
110 PacketcableProvider.ccapIID, this, DataBroker.DataChangeScope.SUBTREE );
112 qosDataChangeListenerRegistration =
113 dataBroker.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
114 PacketcableProvider.qosIID, this, DataBroker.DataChangeScope.SUBTREE );
117 * Implemented from the AutoCloseable interface.
120 public void close() throws ExecutionException, InterruptedException {
122 if (packetcableProviderRegistration != null) {
123 packetcableProviderRegistration.unregister();
125 if (ccapDataChangeListenerRegistration != null) {
126 ccapDataChangeListenerRegistration.close();
129 if (qosDataChangeListenerRegistration != null) {
130 qosDataChangeListenerRegistration.close();
134 public InetAddress getInetAddress(final String subId){
136 return InetAddress.getByName(subId);
137 } catch (UnknownHostException e) {
138 logger.error("getInetAddress: {} FAILED: {}", subId, e.getMessage());
143 private String getIpPrefixStr(final IpPrefix ipPrefix) {
144 final Ipv4Prefix ipv4 = ipPrefix.getIpv4Prefix();
146 return ipv4.getValue();
148 return ipPrefix.getIpv6Prefix().getValue();
152 private void updateCcapMaps(final Ccaps ccap) {
153 // add ccap to the subscriberSubnets map
154 for (final IpPrefix ipPrefix : ccap.getSubscriberSubnets()) {
156 subscriberSubnetsMap.put(Subnet.createInstance(getIpPrefixStr(ipPrefix)), ccap);
157 } catch (UnknownHostException e) {
158 logger.error("updateSubscriberSubnets: {}:{} FAILED: {}", ipPrefix, ccap, e.getMessage());
161 // ccap to upstream SCN map
162 for (final ServiceClassName scn : ccap.getUpstreamScns()) {
163 if (upstreamScnMap.containsKey(scn)) {
164 upstreamScnMap.get(scn).add(ccap);
166 final List<Ccaps> ccapList = new ArrayList<>();
168 upstreamScnMap.put(scn, ccapList);
171 // ccap to downstream SCN map
172 for (final ServiceClassName scn : ccap.getDownstreamScns()) {
173 if (downstreamScnMap.containsKey(scn)) {
174 downstreamScnMap.get(scn).add(ccap);
176 final List<Ccaps> ccapList = new ArrayList<>();
178 downstreamScnMap.put(scn, ccapList);
183 private void removeCcapFromAllMaps(final Ccaps ccap) {
184 // remove the ccap from all maps
185 // subscriberSubnets map
186 for (final Map.Entry<Subnet, Ccaps> entry : subscriberSubnetsMap.entrySet()) {
187 if (entry.getValue() == ccap) {
188 subscriberSubnetsMap.remove(entry.getKey());
191 // ccap to upstream SCN map
192 for (final Map.Entry<ServiceClassName, List<Ccaps>> entry : upstreamScnMap.entrySet()) {
193 final List<Ccaps> ccapList = entry.getValue();
194 ccapList.remove(ccap);
195 if (ccapList.isEmpty()) {
196 upstreamScnMap.remove(entry.getKey());
199 // ccap to downstream SCN map
200 for (final Map.Entry<ServiceClassName, List<Ccaps>> entry : downstreamScnMap.entrySet()) {
201 final List<Ccaps> ccapList = entry.getValue();
202 ccapList.remove(ccap);
203 if (ccapList.isEmpty()) {
204 downstreamScnMap.remove(entry.getKey());
208 final PCMMService service = pcmmServiceMap.remove(ccap.getCcapId());
209 if (service != null) service.disconect();
212 private Ccaps findCcapForSubscriberId(final InetAddress inetAddr) {
213 Ccaps matchedCcap = null;
214 int longestPrefixLen = -1;
215 for (final Map.Entry<Subnet, Ccaps> entry : subscriberSubnetsMap.entrySet()) {
216 final Subnet subnet = entry.getKey();
217 if (subnet.isInNet(inetAddr)) {
218 int prefixLen = subnet.getPrefixLen();
219 if (prefixLen > longestPrefixLen) {
220 matchedCcap = entry.getValue();
221 longestPrefixLen = prefixLen;
228 private ServiceFlowDirection findScnOnCcap(final ServiceClassName scn, final Ccaps ccap) {
229 if (upstreamScnMap.containsKey(scn)) {
230 final List<Ccaps> ccapList = upstreamScnMap.get(scn);
231 if (ccapList.contains(ccap)) {
232 return ServiceFlowDirection.Us;
234 } else if (downstreamScnMap.containsKey(scn)) {
235 final List<Ccaps> ccapList = downstreamScnMap.get(scn);
236 if (ccapList.contains(ccap)) {
237 return ServiceFlowDirection.Ds;
244 * Implemented from the DataChangeListener interface.
247 private class InstanceData {
249 public final Map<InstanceIdentifier<Ccaps>, Ccaps> ccapIidMap = new HashMap<>();
252 public final Map<String, String> gatePathMap = new HashMap<>();
253 public String gatePath;
254 public final Map<InstanceIdentifier<Gates>, Gates> gateIidMap = new HashMap<>();
255 // remove path for either CCAP or Gates
256 public final Set<String> removePathList = new HashSet<>();
258 public InstanceData(final Map<InstanceIdentifier<?>, DataObject> thisData) {
259 // only used to parse createdData or updatedData
261 if (ccapIidMap.isEmpty()) {
263 if (! gateIidMap.isEmpty()){
264 gatePath = gatePathMap.get("appId") + "/" + gatePathMap.get("subId");
269 public InstanceData(final Set<InstanceIdentifier<?>> thisData) {
270 // only used to parse the removedData paths
271 for (final InstanceIdentifier<?> removeThis : thisData) {
272 getGatePathMap(removeThis);
273 if (gatePathMap.containsKey("ccapId")) {
274 gatePath = gatePathMap.get("ccapId");
275 removePathList.add(gatePath);
276 } else if (gatePathMap.containsKey("gateId")) {
277 gatePath = gatePathMap.get("appId") + "/" + gatePathMap.get("subId") + "/" + gatePathMap.get("gateId");
278 removePathList.add(gatePath);
282 private void getGatePathMap(final InstanceIdentifier<?> thisInstance) {
283 logger.info("onDataChanged().getGatePathMap(): " + thisInstance);
285 final InstanceIdentifier<Ccaps> ccapInstance = thisInstance.firstIdentifierOf(Ccaps.class);
286 if (ccapInstance != null) {
287 final CcapsKey ccapKey = InstanceIdentifier.keyOf(ccapInstance);
288 if (ccapKey != null) {
289 gatePathMap.put("ccapId", ccapKey.getCcapId());
292 // get the gate path keys from the InstanceIdentifier Map key set if they are there
293 final InstanceIdentifier<Apps> appsInstance = thisInstance.firstIdentifierOf(Apps.class);
294 if (appsInstance != null) {
295 final AppsKey appKey = InstanceIdentifier.keyOf(appsInstance);
296 if (appKey != null) {
297 gatePathMap.put("appId", appKey.getAppId());
300 final InstanceIdentifier<Subs> subsInstance = thisInstance.firstIdentifierOf(Subs.class);
301 if (subsInstance != null) {
302 final SubsKey subKey = InstanceIdentifier.keyOf(subsInstance);
303 if (subKey != null) {
304 subId = subKey.getSubId();
305 gatePathMap.put("subId", subId);
308 final InstanceIdentifier<Gates> gatesInstance = thisInstance.firstIdentifierOf(Gates.class);
309 if (gatesInstance != null) {
310 final GatesKey gateKey = InstanceIdentifier.keyOf(gatesInstance);
311 if (gateKey != null) {
312 gatePathMap.put("gateId", gateKey.getGateId());
316 } catch (ClassCastException err) {
317 logger.warn("Unexpected exception", err);
321 private void getCcaps(final Map<InstanceIdentifier<?>, DataObject> thisData) {
322 logger.info("onDataChanged().getCcaps(): " + thisData);
323 for (final Map.Entry<InstanceIdentifier<?>, DataObject> entry : thisData.entrySet()) {
324 if (entry.getValue() instanceof Ccaps) {
325 // TODO FIXME - Potential ClassCastException thrown here!!!
326 ccapIidMap.put((InstanceIdentifier<Ccaps>)entry.getKey(), (Ccaps)entry.getValue());
331 private void getGates(final Map<InstanceIdentifier<?>, DataObject> thisData) {
332 logger.info("onDataChanged().getGates(): " + thisData);
333 for (final Map.Entry<InstanceIdentifier<?>, DataObject> entry : thisData.entrySet()) {
334 if (entry.getValue() instanceof Gates) {
335 final Gates gate = (Gates)entry.getValue();
337 // TODO FIXME - Potential ClassCastException thrown here!!!
338 final InstanceIdentifier<Gates> gateIID = (InstanceIdentifier<Gates>)entry.getKey();
339 getGatePathMap(gateIID);
340 gateIidMap.put(gateIID, gate);
347 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
348 logger.info("onDataChanged");
349 // Determine what change action took place by looking at the change object's InstanceIdentifier sets
350 // and validate all instance data
351 if (!change.getCreatedData().isEmpty()) {
352 if (!new ValidateInstanceData(dataBroker, change.getCreatedData()).validateYang()) {
353 // leave now -- a bad yang object has been detected and a response object has been inserted
356 onCreate(new InstanceData(change.getCreatedData()));
357 } else if (!change.getRemovedPaths().isEmpty()) {
358 onRemove(new InstanceData(change.getRemovedPaths()));
359 } else if (!change.getUpdatedData().isEmpty()) {
360 if (new ValidateInstanceData(dataBroker, change.getUpdatedData()).isResponseEcho()) {
361 // leave now -- this is an echo of the inserted response object
364 onUpdate(new InstanceData(change.getUpdatedData()));
366 // we should not be here -- complain bitterly and return
367 logger.error("onDataChanged(): Unknown change action: " + change);
371 private void onCreate(final InstanceData thisData) {
372 logger.info("onCreate(): " + thisData);
374 // get the CCAP parameters
376 if (! thisData.ccapIidMap.isEmpty()) {
377 for (Map.Entry<InstanceIdentifier<Ccaps>, Ccaps> entry : thisData.ccapIidMap.entrySet()) {
378 final Ccaps thisCcap = entry.getValue();
379 // get the CCAP node identity from the Instance Data
380 final String ccapId = thisCcap.getCcapId();
382 if (pcmmServiceMap.get(thisCcap.getCcapId()) == null) {
383 final PCMMService pcmmService = new PCMMService(IPCMMClient.CLIENT_TYPE, thisCcap);
384 // TODO - may want to use the AMID but for the client type but probably not???
386 final PCMMService pcmmService = new PCMMService(
387 thisCcap.getAmId().getAmType().shortValue(), thisCcap);
389 pcmmServiceMap.put(thisCcap.getCcapId(), pcmmService);
390 message = pcmmService.addCcap();
391 if (message.contains("200 OK")) {
392 ccapMap.put(ccapId, thisCcap);
393 updateCcapMaps(thisCcap);
394 logger.info("Created CCAP: {}/{} : {}", thisData.gatePath, thisCcap, message);
395 logger.info("Created CCAP: {} : {}", thisData.gatePath, message);
397 // TODO - when a connection cannot be made, need to remove CCAP from ODL cache.
398 logger.error("Create CCAP Failed: {} : {}", thisData.gatePath, message);
400 // set the response string in the config ccap object using a new thread
401 executor.execute(new Response(dataBroker, entry.getKey(), thisCcap, message));
403 logger.error("Already monitoring CCAP - " + thisCcap);
408 // get the PCMM gate parameters from the ccapId/appId/subId/gateId path in the Maps entry (if new gate)
409 for (final Map.Entry<InstanceIdentifier<Gates>, Gates> entry : thisData.gateIidMap.entrySet()) {
411 final Gates gate = entry.getValue();
412 final String gateId = gate.getGateId();
413 final String gatePathStr = thisData.gatePath + "/" + gateId ;
414 final InetAddress subId = getInetAddress(thisData.subId);
416 final Ccaps thisCcap = findCcapForSubscriberId(subId);
417 if (thisCcap != null) {
418 final String ccapId = thisCcap.getCcapId();
419 // verify SCN exists on CCAP and force gateSpec.Direction to align with SCN direction
420 final ServiceClassName scn = gate.getTrafficProfile().getServiceClassName();
422 final ServiceFlowDirection scnDir = findScnOnCcap(scn, thisCcap);
423 if (scnDir != null) {
424 if (pcmmServiceMap.get(thisCcap.getCcapId()) != null) {
425 message = pcmmServiceMap.get(thisCcap.getCcapId()).sendGateSet(gatePathStr, subId, gate, scnDir);
426 gateMap.put(gatePathStr, gate);
427 gateCcapMap.put(gatePathStr, thisCcap.getCcapId());
429 if (message.contains("200 OK")) {
430 logger.info("Created QoS gate {} for {}/{}/{} - {}",
431 gateId, ccapId, gatePathStr, gate, message);
432 logger.info("Created QoS gate {} for {}/{} - {}",
433 gateId, ccapId, gatePathStr, message);
435 logger.info("Unable to create QoS gate {} for {}/{}/{} - {}",
436 gateId, ccapId, gatePathStr, gate, message);
437 logger.error("Unable to create QoS gate {} for {}/{} - {}",
438 gateId, ccapId, gatePathStr, message);
441 logger.error("Unable to locate PCMM Service for CCAP - " + thisCcap);
445 logger.error("PCMMService: sendGateSet(): SCN {} not found on CCAP {} for {}/{}",
446 scn.getValue(), thisCcap, gatePathStr, gate);
447 message = String.format("404 Not Found - SCN %s not found on CCAP %s for %s",
448 scn.getValue(), thisCcap.getCcapId(), gatePathStr);
452 final String subIdStr = thisData.subId;
453 message = String.format("404 Not Found - no CCAP found for subscriber %s in %s",
454 subIdStr, gatePathStr);
455 logger.info("Create QoS gate {} FAILED: no CCAP found for subscriber {}: @ {}/{}",
456 gateId, subIdStr, gatePathStr, gate);
457 logger.error("Create QoS gate {} FAILED: no CCAP found for subscriber {}: @ {}",
458 gateId, subIdStr, gatePathStr);
461 final String subIdStr = thisData.subId;
462 message = String.format("400 Bad Request - subId must be a valid IP address for subscriber %s in %s",
463 subIdStr, gatePathStr);
464 logger.info("Create QoS gate {} FAILED: subId must be a valid IP address for subscriber {}: @ {}/{}",
465 gateId, subIdStr, gatePathStr, gate);
466 logger.error("Create QoS gate {} FAILED: subId must be a valid IP address for subscriber {}: @ {}",
467 gateId, subIdStr, gatePathStr);
469 // set the response message in the config gate object using a new thread
470 executor.execute(new Response(dataBroker, entry.getKey(), gate, message));
475 private void onRemove(final InstanceData thisData) {
476 logger.info("onRemove(): " + thisData);
477 for (final String gatePathStr: thisData.removePathList) {
478 if (gateMap.containsKey(gatePathStr)) {
479 final Gates thisGate = gateMap.remove(gatePathStr);
480 final String gateId = thisGate.getGateId();
481 final String ccapId = gateCcapMap.remove(gatePathStr);
482 final Ccaps thisCcap = ccapMap.get(ccapId);
483 final PCMMService service = pcmmServiceMap.get(thisCcap.getCcapId());
484 if (service != null) {
485 service.sendGateDelete(gatePathStr);
486 logger.info("onDataChanged(): removed QoS gate {} for {}/{}/{}: ", gateId, ccapId, gatePathStr, thisGate);
487 logger.info("onDataChanged(): removed QoS gate {} for {}/{}: ", gateId, ccapId, gatePathStr);
489 logger.warn("Unable to send to locate PCMMService to send gate delete message with CCAP - "
493 for (final String ccapIdStr: thisData.removePathList) {
494 if (ccapMap.containsKey(ccapIdStr)) {
495 final Ccaps thisCcap = ccapMap.remove(ccapIdStr);
496 removeCcapFromAllMaps(thisCcap);
501 private void onUpdate(final InstanceData oldData) {
502 logger.info("onUpdate(): " + oldData);
503 // update operation not allowed -- restore the original config object and complain
504 if (! oldData.ccapIidMap.isEmpty()) {
505 for (final Map.Entry<InstanceIdentifier<Ccaps>, Ccaps> entry : oldData.ccapIidMap.entrySet()) {
506 final Ccaps ccap = entry.getValue();
507 final String ccapId = ccap.getCcapId();
508 String message = String.format("405 Method Not Allowed - %s: CCAP update not permitted (use delete); ",
510 // push new error message onto existing response
511 message += ccap.getResponse();
512 // set the response message in the config object using a new thread -- also restores the original data
513 executor.execute(new Response(dataBroker, entry.getKey(), ccap, message));
514 logger.error("onDataChanged(): CCAP update not permitted {}/{}", ccapId, ccap);
517 for (final Map.Entry<InstanceIdentifier<Gates>, Gates> entry : oldData.gateIidMap.entrySet()) {
518 final Gates gate = entry.getValue();
519 final String gatePathStr = oldData.gatePath + "/" + gate.getGateId() ;
520 String message = String.format("405 Method Not Allowed - %s: QoS Gate update not permitted (use delete); ", gatePathStr);
521 // push new error message onto existing response
522 message += gate.getResponse();
523 // set the response message in the config object using a new thread -- also restores the original data
524 executor.execute(new Response(dataBroker, entry.getKey(), gate, message));
525 logger.error("onDataChanged(): QoS Gate update not permitted: {}/{}", gatePathStr, gate);