1 package org.opendaylight.controller.packetcable.provider;
3 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
4 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
5 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
6 import org.opendaylight.controller.md.sal.common.api.data.AsyncReadWriteTransaction;
7 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
8 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpPrefix;
9 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Prefix;
10 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.Ccap;
11 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.Qos;
12 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ServiceClassName;
13 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ServiceFlowDirection;
14 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ccap.Ccaps;
15 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ccap.CcapsKey;
16 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.Apps;
17 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.AppsKey;
18 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.Subs;
19 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.SubsKey;
20 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.subs.Gates;
21 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.subs.GatesKey;
22 import org.opendaylight.yangtools.yang.binding.DataObject;
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
24 import org.pcmm.rcd.IPCMMClient;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 import javax.annotation.concurrent.ThreadSafe;
29 import java.net.InetAddress;
30 import java.net.UnknownHostException;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
38 * Called by ODL framework to start this bundle.
40 * This class is responsible for processing messages received from ODL's restconf interface.
41 * TODO - Remove some of these state maps and move some of this into the PCMMService
44 public class PacketcableProvider implements DataChangeListener, AutoCloseable {
46 private static final Logger logger = LoggerFactory.getLogger(PacketcableProvider.class);
48 // keys to the /restconf/config/packetcable:ccap and /restconf/config/packetcable:qos config datastore
49 public static final InstanceIdentifier<Ccap> ccapIID = InstanceIdentifier.builder(Ccap.class).build();
50 public static final InstanceIdentifier<Qos> qosIID = InstanceIdentifier.builder(Qos.class).build();
53 * The ODL object used to broker messages throughout the framework
55 private final DataBroker dataBroker;
58 * The thread pool executor
60 private final ExecutorService executor;
62 // TODO - Revisit these maps and remove the ones no longer necessary
63 private final Map<String, Ccaps> ccapMap = new ConcurrentHashMap<>();
64 private final Map<String, Gates> gateMap = new ConcurrentHashMap<>();
65 private final Map<String, String> gateCcapMap = new ConcurrentHashMap<>();
66 private final Map<Subnet, Ccaps> subscriberSubnetsMap = new ConcurrentHashMap<>();
67 private final Map<ServiceClassName, List<Ccaps>> downstreamScnMap = new ConcurrentHashMap<>();
68 private final Map<ServiceClassName, List<Ccaps>> upstreamScnMap = new ConcurrentHashMap<>();
71 * Holds a PCMMService object for each CCAP being managed.
73 private final Map<Ccaps, PCMMService> pcmmServiceMap = new ConcurrentHashMap<>();
78 public PacketcableProvider(final DataBroker dataBroker) {
79 logger.info("Starting provider");
80 this.dataBroker = dataBroker;
81 executor = Executors.newCachedThreadPool();
85 * Implemented from the AutoCloseable interface.
88 public void close() throws ExecutionException, InterruptedException {
90 if (dataBroker != null) {
91 // remove our config datastore instances
92 final AsyncReadWriteTransaction<InstanceIdentifier<?>, ?> tx = dataBroker.newReadWriteTransaction();
93 tx.delete(LogicalDatastoreType.CONFIGURATION, ccapIID);
94 tx.delete(LogicalDatastoreType.CONFIGURATION, qosIID);
95 // TODO - commit() below has been deprecated
100 public InetAddress getInetAddress(final String subId){
102 return InetAddress.getByName(subId);
103 } catch (UnknownHostException e) {
104 logger.error("getInetAddress: {} FAILED: {}", subId, e.getMessage());
109 private String getIpPrefixStr(final IpPrefix ipPrefix) {
110 final Ipv4Prefix ipv4 = ipPrefix.getIpv4Prefix();
112 return ipv4.getValue();
114 return ipPrefix.getIpv6Prefix().getValue();
118 private void updateCcapMaps(final Ccaps ccap) {
119 // add ccap to the subscriberSubnets map
120 for (final IpPrefix ipPrefix : ccap.getSubscriberSubnets()) {
122 subscriberSubnetsMap.put(Subnet.createInstance(getIpPrefixStr(ipPrefix)), ccap);
123 } catch (UnknownHostException e) {
124 logger.error("updateSubscriberSubnets: {}:{} FAILED: {}", ipPrefix, ccap, e.getMessage());
127 // ccap to upstream SCN map
128 for (final ServiceClassName scn : ccap.getUpstreamScns()) {
129 if (upstreamScnMap.containsKey(scn)) {
130 upstreamScnMap.get(scn).add(ccap);
132 final List<Ccaps> ccapList = new ArrayList<>();
134 upstreamScnMap.put(scn, ccapList);
137 // ccap to downstream SCN map
138 for (final ServiceClassName scn : ccap.getDownstreamScns()) {
139 if (downstreamScnMap.containsKey(scn)) {
140 downstreamScnMap.get(scn).add(ccap);
142 final List<Ccaps> ccapList = new ArrayList<>();
144 downstreamScnMap.put(scn, ccapList);
149 private void removeCcapFromAllMaps(final Ccaps ccap) {
150 // remove the ccap from all maps
151 // subscriberSubnets map
152 for (final Map.Entry<Subnet, Ccaps> entry : subscriberSubnetsMap.entrySet()) {
153 if (entry.getValue() == ccap) {
154 subscriberSubnetsMap.remove(entry.getKey());
157 // ccap to upstream SCN map
158 for (final Map.Entry<ServiceClassName, List<Ccaps>> entry : upstreamScnMap.entrySet()) {
159 final List<Ccaps> ccapList = entry.getValue();
160 ccapList.remove(ccap);
161 if (ccapList.isEmpty()) {
162 upstreamScnMap.remove(entry.getKey());
165 // ccap to downstream SCN map
166 for (final Map.Entry<ServiceClassName, List<Ccaps>> entry : downstreamScnMap.entrySet()) {
167 final List<Ccaps> ccapList = entry.getValue();
168 ccapList.remove(ccap);
169 if (ccapList.isEmpty()) {
170 downstreamScnMap.remove(entry.getKey());
174 final PCMMService service = pcmmServiceMap.remove(ccap);
175 if (service != null) service.disconect();
178 private Ccaps findCcapForSubscriberId(final InetAddress inetAddr) {
179 Ccaps matchedCcap = null;
180 int longestPrefixLen = -1;
181 for (final Map.Entry<Subnet, Ccaps> entry : subscriberSubnetsMap.entrySet()) {
182 final Subnet subnet = entry.getKey();
183 if (subnet.isInNet(inetAddr)) {
184 int prefixLen = subnet.getPrefixLen();
185 if (prefixLen > longestPrefixLen) {
186 matchedCcap = entry.getValue();
187 longestPrefixLen = prefixLen;
194 private ServiceFlowDirection findScnOnCcap(final ServiceClassName scn, final Ccaps ccap) {
195 if (upstreamScnMap.containsKey(scn)) {
196 final List<Ccaps> ccapList = upstreamScnMap.get(scn);
197 if (ccapList.contains(ccap)) {
198 return ServiceFlowDirection.Us;
200 } else if (downstreamScnMap.containsKey(scn)) {
201 final List<Ccaps> ccapList = downstreamScnMap.get(scn);
202 if (ccapList.contains(ccap)) {
203 return ServiceFlowDirection.Ds;
210 * Implemented from the DataChangeListener interface.
213 private class InstanceData {
215 public final Map<InstanceIdentifier<Ccaps>, Ccaps> ccapIidMap = new HashMap<>();
218 public final Map<String, String> gatePathMap = new HashMap<>();
219 public String gatePath;
220 public final Map<InstanceIdentifier<Gates>, Gates> gateIidMap = new HashMap<>();
221 // remove path for either CCAP or Gates
222 public final Set<String> removePathList = new HashSet<>();
224 public InstanceData(final Map<InstanceIdentifier<?>, DataObject> thisData) {
225 // only used to parse createdData or updatedData
227 if (ccapIidMap.isEmpty()) {
229 if (! gateIidMap.isEmpty()){
230 gatePath = gatePathMap.get("appId") + "/" + gatePathMap.get("subId");
235 public InstanceData(final Set<InstanceIdentifier<?>> thisData) {
236 // only used to parse the removedData paths
237 for (final InstanceIdentifier<?> removeThis : thisData) {
238 getGatePathMap(removeThis);
239 if (gatePathMap.containsKey("ccapId")) {
240 gatePath = gatePathMap.get("ccapId");
241 removePathList.add(gatePath);
242 } else if (gatePathMap.containsKey("gateId")) {
243 gatePath = gatePathMap.get("appId") + "/" + gatePathMap.get("subId") + "/" + gatePathMap.get("gateId");
244 removePathList.add(gatePath);
248 private void getGatePathMap(final InstanceIdentifier<?> thisInstance) {
249 logger.info("onDataChanged().getGatePathMap(): " + thisInstance);
251 final InstanceIdentifier<Ccaps> ccapInstance = thisInstance.firstIdentifierOf(Ccaps.class);
252 if (ccapInstance != null) {
253 final CcapsKey ccapKey = InstanceIdentifier.keyOf(ccapInstance);
254 if (ccapKey != null) {
255 gatePathMap.put("ccapId", ccapKey.getCcapId());
258 // get the gate path keys from the InstanceIdentifier Map key set if they are there
259 final InstanceIdentifier<Apps> appsInstance = thisInstance.firstIdentifierOf(Apps.class);
260 if (appsInstance != null) {
261 final AppsKey appKey = InstanceIdentifier.keyOf(appsInstance);
262 if (appKey != null) {
263 gatePathMap.put("appId", appKey.getAppId());
266 final InstanceIdentifier<Subs> subsInstance = thisInstance.firstIdentifierOf(Subs.class);
267 if (subsInstance != null) {
268 final SubsKey subKey = InstanceIdentifier.keyOf(subsInstance);
269 if (subKey != null) {
270 subId = subKey.getSubId();
271 gatePathMap.put("subId", subId);
274 final InstanceIdentifier<Gates> gatesInstance = thisInstance.firstIdentifierOf(Gates.class);
275 if (gatesInstance != null) {
276 final GatesKey gateKey = InstanceIdentifier.keyOf(gatesInstance);
277 if (gateKey != null) {
278 gatePathMap.put("gateId", gateKey.getGateId());
282 } catch (ClassCastException err) {
283 logger.warn("Unexpected exception", err);
287 private void getCcaps(final Map<InstanceIdentifier<?>, DataObject> thisData) {
288 logger.info("onDataChanged().getCcaps(): " + thisData);
289 for (final Map.Entry<InstanceIdentifier<?>, DataObject> entry : thisData.entrySet()) {
290 if (entry.getValue() instanceof Ccaps) {
291 // TODO FIXME - Potential ClassCastException thrown here!!!
292 ccapIidMap.put((InstanceIdentifier<Ccaps>)entry.getKey(), (Ccaps)entry.getValue());
297 private void getGates(final Map<InstanceIdentifier<?>, DataObject> thisData) {
298 logger.info("onDataChanged().getGates(): " + thisData);
299 for (final Map.Entry<InstanceIdentifier<?>, DataObject> entry : thisData.entrySet()) {
300 if (entry.getValue() instanceof Gates) {
301 final Gates gate = (Gates)entry.getValue();
303 // TODO FIXME - Potential ClassCastException thrown here!!!
304 final InstanceIdentifier<Gates> gateIID = (InstanceIdentifier<Gates>)entry.getKey();
305 getGatePathMap(gateIID);
306 gateIidMap.put(gateIID, gate);
313 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
314 logger.info("onDataChanged");
315 // Determine what change action took place by looking at the change object's InstanceIdentifier sets
316 // and validate all instance data
317 if (!change.getCreatedData().isEmpty()) {
318 if (!new ValidateInstanceData(dataBroker, change.getCreatedData()).validateYang()) {
319 // leave now -- a bad yang object has been detected and a response object has been inserted
322 onCreate(new InstanceData(change.getCreatedData()));
323 } else if (!change.getRemovedPaths().isEmpty()) {
324 onRemove(new InstanceData(change.getRemovedPaths()));
325 } else if (!change.getUpdatedData().isEmpty()) {
326 if (new ValidateInstanceData(dataBroker, change.getUpdatedData()).isResponseEcho()) {
327 // leave now -- this is an echo of the inserted response object
330 onUpdate(new InstanceData(change.getUpdatedData()));
332 // we should not be here -- complain bitterly and return
333 logger.error("onDataChanged(): Unknown change action: " + change);
337 private void onCreate(final InstanceData thisData) {
338 logger.info("onCreate(): " + thisData);
340 // get the CCAP parameters
342 if (! thisData.ccapIidMap.isEmpty()) {
343 for (Map.Entry<InstanceIdentifier<Ccaps>, Ccaps> entry : thisData.ccapIidMap.entrySet()) {
344 final Ccaps thisCcap = entry.getValue();
345 // get the CCAP node identity from the Instance Data
346 final String ccapId = thisCcap.getCcapId();
348 if (pcmmServiceMap.get(thisCcap) == null) {
349 final PCMMService pcmmService = new PCMMService(IPCMMClient.CLIENT_TYPE, thisCcap);
350 // TODO - may want to use the AMID but for the client type but probably not???
352 final PCMMService pcmmService = new PCMMService(
353 thisCcap.getAmId().getAmType().shortValue(), thisCcap);
355 pcmmServiceMap.put(thisCcap, pcmmService);
356 message = pcmmService.addCcap();
357 if (message.contains("200 OK")) {
358 ccapMap.put(ccapId, thisCcap);
359 updateCcapMaps(thisCcap);
360 logger.info("onDataChanged(): created CCAP: {}/{} : {}", thisData.gatePath, thisCcap, message);
361 logger.info("onDataChanged(): created CCAP: {} : {}", thisData.gatePath, message);
363 // TODO - when a connection cannot be made, need to remove CCAP from ODL cache.
364 logger.error("onDataChanged(): create CCAP Failed: {} : {}", thisData.gatePath, message);
366 // set the response string in the config ccap object using a new thread
367 executor.execute(new Response(dataBroker, entry.getKey(), thisCcap, message));
369 logger.error("Already monitoring CCAP - " + thisCcap);
374 // get the PCMM gate parameters from the ccapId/appId/subId/gateId path in the Maps entry (if new gate)
375 for (final Map.Entry<InstanceIdentifier<Gates>, Gates> entry : thisData.gateIidMap.entrySet()) {
377 final Gates gate = entry.getValue();
378 final String gateId = gate.getGateId();
379 final String gatePathStr = thisData.gatePath + "/" + gateId ;
380 final InetAddress subId = getInetAddress(thisData.subId);
382 final Ccaps thisCcap = findCcapForSubscriberId(subId);
383 if (thisCcap != null) {
384 final String ccapId = thisCcap.getCcapId();
385 // verify SCN exists on CCAP and force gateSpec.Direction to align with SCN direction
386 final ServiceClassName scn = gate.getTrafficProfile().getServiceClassName();
388 final ServiceFlowDirection scnDir = findScnOnCcap(scn, thisCcap);
389 if (scnDir != null) {
390 if (pcmmServiceMap.get(thisCcap) != null) {
391 message = pcmmServiceMap.get(thisCcap).sendGateSet(gatePathStr, subId, gate, scnDir);
392 if (message.contains("200 OK")) {
393 gateMap.put(gatePathStr, gate);
394 gateCcapMap.put(gatePathStr, thisCcap.getCcapId());
395 logger.info("onDataChanged(): created QoS gate {} for {}/{}/{} - {}",
396 gateId, ccapId, gatePathStr, gate, message);
397 logger.info("onDataChanged(): created QoS gate {} for {}/{} - {}",
398 gateId, ccapId, gatePathStr, message);
400 logger.info("onDataChanged(): Unable to create QoS gate {} for {}/{}/{} - {}",
401 gateId, ccapId, gatePathStr, gate, message);
402 logger.error("onDataChanged(): Unable to create QoS gate {} for {}/{} - {}",
403 gateId, ccapId, gatePathStr, message);
406 logger.error("Unable to locate PCMM Service for CCAP - " + thisCcap);
410 logger.error("PCMMService: sendGateSet(): SCN {} not found on CCAP {} for {}/{}",
411 scn.getValue(), thisCcap, gatePathStr, gate);
412 message = String.format("404 Not Found - SCN %s not found on CCAP %s for %s",
413 scn.getValue(), thisCcap.getCcapId(), gatePathStr);
417 final String subIdStr = thisData.subId;
418 message = String.format("404 Not Found - no CCAP found for subscriber %s in %s",
419 subIdStr, gatePathStr);
420 logger.info("onDataChanged(): create QoS gate {} FAILED: no CCAP found for subscriber {}: @ {}/{}",
421 gateId, subIdStr, gatePathStr, gate);
422 logger.error("onDataChanged(): create QoS gate {} FAILED: no CCAP found for subscriber {}: @ {}",
423 gateId, subIdStr, gatePathStr);
426 final String subIdStr = thisData.subId;
427 message = String.format("400 Bad Request - subId must be a valid IP address for subscriber %s in %s",
428 subIdStr, gatePathStr);
429 logger.info("onDataChanged(): create QoS gate {} FAILED: subId must be a valid IP address for subscriber {}: @ {}/{}",
430 gateId, subIdStr, gatePathStr, gate);
431 logger.error("onDataChanged(): create QoS gate {} FAILED: subId must be a valid IP address for subscriber {}: @ {}",
432 gateId, subIdStr, gatePathStr);
434 // set the response message in the config gate object using a new thread
435 executor.execute(new Response(dataBroker, entry.getKey(), gate, message));
440 private void onRemove(final InstanceData thisData) {
441 logger.info("onRemove(): " + thisData);
442 for (final String gatePathStr: thisData.removePathList) {
443 if (gateMap.containsKey(gatePathStr)) {
444 final Gates thisGate = gateMap.remove(gatePathStr);
445 final String gateId = thisGate.getGateId();
446 final String ccapId = gateCcapMap.remove(gatePathStr);
447 final Ccaps thisCcap = ccapMap.get(ccapId);
448 final PCMMService service = pcmmServiceMap.get(thisCcap);
449 if (service != null) {
450 service.sendGateDelete(gatePathStr);
451 logger.info("onDataChanged(): removed QoS gate {} for {}/{}/{}: ", gateId, ccapId, gatePathStr, thisGate);
452 logger.info("onDataChanged(): removed QoS gate {} for {}/{}: ", gateId, ccapId, gatePathStr);
454 logger.warn("Unable to send to locate PCMMService to send gate delete message with CCAP - "
458 for (final String ccapIdStr: thisData.removePathList) {
459 if (ccapMap.containsKey(ccapIdStr)) {
460 final Ccaps thisCcap = ccapMap.remove(ccapIdStr);
461 removeCcapFromAllMaps(thisCcap);
466 private void onUpdate(final InstanceData oldData) {
467 logger.info("onUpdate(): " + oldData);
468 // update operation not allowed -- restore the original config object and complain
469 if (! oldData.ccapIidMap.isEmpty()) {
470 for (final Map.Entry<InstanceIdentifier<Ccaps>, Ccaps> entry : oldData.ccapIidMap.entrySet()) {
471 final Ccaps ccap = entry.getValue();
472 final String ccapId = ccap.getCcapId();
473 String message = String.format("405 Method Not Allowed - %s: CCAP update not permitted (use delete); ",
475 // push new error message onto existing response
476 message += ccap.getResponse();
477 // set the response message in the config object using a new thread -- also restores the original data
478 executor.execute(new Response(dataBroker, entry.getKey(), ccap, message));
479 logger.error("onDataChanged(): CCAP update not permitted {}/{}", ccapId, ccap);
482 for (final Map.Entry<InstanceIdentifier<Gates>, Gates> entry : oldData.gateIidMap.entrySet()) {
483 final Gates gate = entry.getValue();
484 final String gatePathStr = oldData.gatePath + "/" + gate.getGateId() ;
485 String message = String.format("405 Method Not Allowed - %s: QoS Gate update not permitted (use delete); ", gatePathStr);
486 // push new error message onto existing response
487 message += gate.getResponse();
488 // set the response message in the config object using a new thread -- also restores the original data
489 executor.execute(new Response(dataBroker, entry.getKey(), gate, message));
490 logger.error("onDataChanged(): QoS Gate update not permitted: {}/{}", gatePathStr, gate);