X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=packetcable-policy-server%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fpacketcable%2Fprovider%2FPacketcableProvider.java;h=1bbf5b283d63eb75887f53cac290d61413a5c4d3;hb=130b69f3ba304f04434c3cfa46d0f24d34593a9c;hp=dfe737d8b25a1d9e6f1ef567dca9c461e96623b2;hpb=0083a5a611239424af45805d8507699401a8d549;p=packetcable.git diff --git a/packetcable-policy-server/src/main/java/org/opendaylight/controller/packetcable/provider/PacketcableProvider.java b/packetcable-policy-server/src/main/java/org/opendaylight/controller/packetcable/provider/PacketcableProvider.java index dfe737d..1bbf5b2 100644 --- a/packetcable-policy-server/src/main/java/org/opendaylight/controller/packetcable/provider/PacketcableProvider.java +++ b/packetcable-policy-server/src/main/java/org/opendaylight/controller/packetcable/provider/PacketcableProvider.java @@ -1,84 +1,174 @@ +/* + * Copyright (c) 2015 CableLabs and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.packetcable.provider; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.ThreadSafe; + import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.md.sal.common.api.data.AsyncReadWriteTransaction; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpPrefix; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Prefix; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.Ccap; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.Qos; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ServiceClassName; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ServiceFlowDirection; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ccap.Ccaps; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ccap.CcapsKey; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.Apps; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.AppsKey; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.Subs; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.SubsKey; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.subs.Gates; -import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.pcmm.qos.gates.apps.subs.GatesKey; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.packetcable.provider.validation.DataValidator; +import org.opendaylight.controller.packetcable.provider.validation.ValidationException; +import org.opendaylight.controller.packetcable.provider.validation.Validator; +import org.opendaylight.controller.packetcable.provider.validation.impl.CcapsValidatorProviderFactory; +import org.opendaylight.controller.packetcable.provider.validation.impl.QosValidatorProviderFactory; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; +import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpPrefix; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Prefix; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.AppContext; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.CcapContext; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.CcapPollConnectionInput; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.CcapPollConnectionOutput; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.CcapPollConnectionOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.CcapSetConnectionInput; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.CcapSetConnectionOutput; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.CcapSetConnectionOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.Ccaps; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.PacketcableService; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.Qos; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.QosPollGatesInput; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.QosPollGatesOutput; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.QosPollGatesOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ServiceClassName; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ServiceFlowDirection; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccap.attributes.ConnectionBuilder; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccaps.Ccap; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccaps.CcapBuilder; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.Apps; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.App; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.AppBuilder; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.AppKey; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.app.Subscribers; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.app.SubscribersBuilder; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.app.subscribers.Subscriber; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.app.subscribers.SubscriberBuilder; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.app.subscribers.SubscriberKey; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.app.subscribers.subscriber.Gates; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.app.subscribers.subscriber.GatesBuilder; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.app.subscribers.subscriber.gates.Gate; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.app.subscribers.subscriber.gates.GateBuilder; +import org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.pcmm.qos.gates.apps.app.subscribers.subscriber.gates.GateKey; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.pcmm.rcd.IPCMMClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.ThreadSafe; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - /** * Called by ODL framework to start this bundle. - * + *

* This class is responsible for processing messages received from ODL's restconf interface. * TODO - Remove some of these state maps and move some of this into the PCMMService + * TODO Don't implement PacketcableService, move that into an inner class */ @ThreadSafe -public class PacketcableProvider implements DataChangeListener, AutoCloseable { +public class PacketcableProvider implements BindingAwareProvider, AutoCloseable, PacketcableService { private static final Logger logger = LoggerFactory.getLogger(PacketcableProvider.class); - // keys to the /restconf/config/packetcable:ccap and /restconf/config/packetcable:qos config datastore - public static final InstanceIdentifier ccapIID = InstanceIdentifier.builder(Ccap.class).build(); - public static final InstanceIdentifier qosIID = InstanceIdentifier.builder(Qos.class).build(); + // keys to the /restconf/config/packetcable:ccaps and /restconf/config/packetcable:qos config datastore + private static final InstanceIdentifier ccapsIID = InstanceIdentifier.builder(Ccaps.class).build(); + private static final InstanceIdentifier qosIID = InstanceIdentifier.builder(Qos.class).build(); + + // TODO - Revisit these maps and remove the ones no longer necessary + private final Map ccapMap = new ConcurrentHashMap<>(); + private final Map gateMap = new ConcurrentHashMap<>(); + private final Map gateCcapMap = new ConcurrentHashMap<>(); + private final Map subscriberSubnetsMap = new ConcurrentHashMap<>(); + private final Map> downstreamScnMap = new ConcurrentHashMap<>(); + private final Map> upstreamScnMap = new ConcurrentHashMap<>(); + + private final Executor executor = Executors.newSingleThreadExecutor(); /** - * The ODL object used to broker messages throughout the framework + * Holds a PCMMService object for each CCAP being managed. */ - private final DataBroker dataBroker; + private final Map pcmmServiceMap = new ConcurrentHashMap<>(); /** - * The thread pool executor + * The ODL object used to broker messages throughout the framework */ - private final ExecutorService executor; + private DataBroker dataBroker; + private MdsalUtils mdsalUtils; - // TODO - Revisit these maps and remove the ones no longer necessary - private final Map ccapMap = new ConcurrentHashMap<>(); - private final Map gateMap = new ConcurrentHashMap<>(); - private final Map gateCcapMap = new ConcurrentHashMap<>(); - private final Map subscriberSubnetsMap = new ConcurrentHashMap<>(); - private final Map> downstreamScnMap = new ConcurrentHashMap<>(); - private final Map> upstreamScnMap = new ConcurrentHashMap<>(); + //Routed RPC Registration + private RoutedRpcRegistration rpcRegistration; - /** - * Holds a PCMMService object for each CCAP being managed. - */ - private final Map pcmmServiceMap = new ConcurrentHashMap<>(); + // Data change listeners/registrations + private final CcapsDataChangeListener ccapsDataChangeListener = new CcapsDataChangeListener(); + private final QosDataChangeListener qosDataChangeListener = new QosDataChangeListener(); + + private ListenerRegistration ccapsDataChangeListenerRegistration; + private ListenerRegistration qosDataChangeListenerRegistration; /** * Constructor */ - public PacketcableProvider(final DataBroker dataBroker) { + public PacketcableProvider() { logger.info("Starting provider"); - this.dataBroker = dataBroker; - executor = Executors.newCachedThreadPool(); + } + + @Override + public void onSessionInitiated(ProviderContext session) { + logger.info("Packetcable Session Initiated"); + logger.info("logging levels: error={}, warn={}, info={}, debug={}, trace={}", logger.isErrorEnabled(), + logger.isWarnEnabled(), logger.isInfoEnabled(), logger.isDebugEnabled(), logger.isTraceEnabled()); + + dataBroker = session.getSALService(DataBroker.class); + + mdsalUtils = new MdsalUtils(dataBroker); + + ccapsDataChangeListenerRegistration = + dataBroker.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, ccapsIID.child(Ccap.class), + ccapsDataChangeListener, DataBroker.DataChangeScope.SUBTREE); + + qosDataChangeListenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, + PacketcableProvider.qosIID.child(Apps.class).child(App.class), qosDataChangeListener, + DataBroker.DataChangeScope.SUBTREE); + + rpcRegistration = session.addRoutedRpcImplementation(PacketcableService.class, this); + logger.info("onSessionInitiated().rpcRgistration: {}", rpcRegistration); + } /** @@ -86,35 +176,16 @@ public class PacketcableProvider implements DataChangeListener, AutoCloseable { */ @Override public void close() throws ExecutionException, InterruptedException { - executor.shutdown(); - if (dataBroker != null) { - // remove our config datastore instances - final AsyncReadWriteTransaction, ?> tx = dataBroker.newReadWriteTransaction(); - tx.delete(LogicalDatastoreType.CONFIGURATION, ccapIID); - tx.delete(LogicalDatastoreType.CONFIGURATION, qosIID); - tx.commit().get(); + if (ccapsDataChangeListenerRegistration != null) { + ccapsDataChangeListenerRegistration.close(); } - } - public InetAddress getInetAddress(final String subId){ - try { - return InetAddress.getByName(subId); - } catch (UnknownHostException e) { - logger.error("getInetAddress: {} FAILED: {}", subId, e.getMessage()); - return null; + if (qosDataChangeListenerRegistration != null) { + qosDataChangeListenerRegistration.close(); } } - private String getIpPrefixStr(final IpPrefix ipPrefix) { - final Ipv4Prefix ipv4 = ipPrefix.getIpv4Prefix(); - if (ipv4 != null) { - return ipv4.getValue(); - } else { - return ipPrefix.getIpv6Prefix().getValue(); - } - } - - private void updateCcapMaps(final Ccaps ccap) { + private void updateCcapMaps(final Ccap ccap) { // add ccap to the subscriberSubnets map for (final IpPrefix ipPrefix : ccap.getSubscriberSubnets()) { try { @@ -128,7 +199,7 @@ public class PacketcableProvider implements DataChangeListener, AutoCloseable { if (upstreamScnMap.containsKey(scn)) { upstreamScnMap.get(scn).add(ccap); } else { - final List ccapList = new ArrayList<>(); + final List ccapList = new ArrayList<>(); ccapList.add(ccap); upstreamScnMap.put(scn, ccapList); } @@ -138,46 +209,36 @@ public class PacketcableProvider implements DataChangeListener, AutoCloseable { if (downstreamScnMap.containsKey(scn)) { downstreamScnMap.get(scn).add(ccap); } else { - final List ccapList = new ArrayList<>(); + final List ccapList = new ArrayList<>(); ccapList.add(ccap); downstreamScnMap.put(scn, ccapList); } } } - private void removeCcapFromAllMaps(final Ccaps ccap) { - // remove the ccap from all maps - // subscriberSubnets map - for (final Map.Entry entry : subscriberSubnetsMap.entrySet()) { - if (entry.getValue() == ccap) { - subscriberSubnetsMap.remove(entry.getKey()); - } - } - // ccap to upstream SCN map - for (final Map.Entry> entry : upstreamScnMap.entrySet()) { - final List ccapList = entry.getValue(); - ccapList.remove(ccap); - if (ccapList.isEmpty()) { - upstreamScnMap.remove(entry.getKey()); - } - } - // ccap to downstream SCN map - for (final Map.Entry> entry : downstreamScnMap.entrySet()) { - final List ccapList = entry.getValue(); - ccapList.remove(ccap); - if (ccapList.isEmpty()) { - downstreamScnMap.remove(entry.getKey()); - } + private String getIpPrefixStr(final IpPrefix ipPrefix) { + final Ipv4Prefix ipv4 = ipPrefix.getIpv4Prefix(); + if (ipv4 != null) { + return ipv4.getValue(); + } else { + return ipPrefix.getIpv6Prefix().getValue(); } + } - final PCMMService service = pcmmServiceMap.remove(ccap); - if (service != null) service.disconect(); + public InetAddress getInetAddress(final String subId) { + try { + return InetAddress.getByName(subId); + } catch (UnknownHostException e) { + logger.error("getInetAddress: {} FAILED: {}", subId, e.getMessage()); + return null; + } } - private Ccaps findCcapForSubscriberId(final InetAddress inetAddr) { - Ccaps matchedCcap = null; + private Ccap findCcapForSubscriberId(final InetAddress inetAddr) { + // TODO replace this with a loading cache, https://github.com/google/guava/wiki/CachesExplained + Ccap matchedCcap = null; int longestPrefixLen = -1; - for (final Map.Entry entry : subscriberSubnetsMap.entrySet()) { + for (final Map.Entry entry : subscriberSubnetsMap.entrySet()) { final Subnet subnet = entry.getKey(); if (subnet.isInNet(inetAddr)) { int prefixLen = subnet.getPrefixLen(); @@ -190,14 +251,17 @@ public class PacketcableProvider implements DataChangeListener, AutoCloseable { return matchedCcap; } - private ServiceFlowDirection findScnOnCcap(final ServiceClassName scn, final Ccaps ccap) { + private ServiceFlowDirection findScnOnCcap(final ServiceClassName scn, final Ccap ccap) { + checkNotNull(scn); + checkNotNull(ccap); + if (upstreamScnMap.containsKey(scn)) { - final List ccapList = upstreamScnMap.get(scn); + final List ccapList = upstreamScnMap.get(scn); if (ccapList.contains(ccap)) { return ServiceFlowDirection.Us; } } else if (downstreamScnMap.containsKey(scn)) { - final List ccapList = downstreamScnMap.get(scn); + final List ccapList = downstreamScnMap.get(scn); if (ccapList.contains(ccap)) { return ServiceFlowDirection.Ds; } @@ -205,286 +269,1054 @@ public class PacketcableProvider implements DataChangeListener, AutoCloseable { return null; } + private void removeCcapFromAllMaps(final Ccap ccap) { + // remove the ccap from all maps + // subscriberSubnets map + for (final Map.Entry entry : subscriberSubnetsMap.entrySet()) { + if (entry.getValue() == ccap) { + subscriberSubnetsMap.remove(entry.getKey()); + } + } + // ccap to upstream SCN map + for (final Map.Entry> entry : upstreamScnMap.entrySet()) { + final List ccapList = entry.getValue(); + ccapList.remove(ccap); + if (ccapList.isEmpty()) { + upstreamScnMap.remove(entry.getKey()); + } + } + // ccap to downstream SCN map + for (final Map.Entry> entry : downstreamScnMap.entrySet()) { + final List ccapList = entry.getValue(); + ccapList.remove(ccap); + if (ccapList.isEmpty()) { + downstreamScnMap.remove(entry.getKey()); + } + } + + final PCMMService service = pcmmServiceMap.remove(ccap.getCcapId()); + if (service != null) { + service.disconect(); + } + } + + // ValidationException does not need to be thrown again + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + private void saveErrors(@Nonnull Map, ValidationException> errorMap, + @Nonnull Map, T> dataMap) { + + final WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); + + + for (InstanceIdentifier iid : errorMap.keySet()) { + + final ValidationException exception = errorMap.get(iid); + final T badData = dataMap.get(iid); + + if (!badData.getImplementedInterface().isAssignableFrom(iid.getTargetType())) { + // InstanceIdentifier does not have the same type as the DataObject + logger.error("Bad InstanceIdentifier to DataObject mapping, {} : {}", iid, badData); + continue; + } + + if (badData instanceof Ccap) { + final Ccap ccap = (Ccap) badData; + + final Ccap opperationalCcap = + new CcapBuilder().setCcapId(ccap.getCcapId()).setError(exception.getErrorMessages()).build(); + + + // type match between iid and badData is done at start of loop + @SuppressWarnings("unchecked") final InstanceIdentifier ccapIID = (InstanceIdentifier) iid; + writeTransaction.put(LogicalDatastoreType.OPERATIONAL, ccapIID, opperationalCcap); + } else if (badData instanceof Gate) { + final Gate gate = (Gate) badData; + + final Gate operationalGate = + new GateBuilder().setGateId(gate.getGateId()).setError(exception.getErrorMessages()).build(); + + final Gates operationalGates = + new GatesBuilder().setGate(Collections.singletonList(operationalGate)).build(); + + final SubscriberKey subscriberKey = InstanceIdentifier.keyOf(iid.firstIdentifierOf(Subscriber.class)); + final Subscriber operationalSubscriber = + new SubscriberBuilder().setSubscriberId(subscriberKey.getSubscriberId()) + .setGates(operationalGates) + .build(); + + final Subscribers operationalSubscribers = + new SubscribersBuilder().setSubscriber(Collections.singletonList(operationalSubscriber)) + .build(); + + final InstanceIdentifier appIID = iid.firstIdentifierOf(App.class); + final AppKey appKey = InstanceIdentifier.keyOf(appIID); + final App operationalApp = + new AppBuilder().setAppId(appKey.getAppId()).setSubscribers(operationalSubscribers).build(); + + + writeTransaction.put(LogicalDatastoreType.OPERATIONAL, appIID, operationalApp); + } else { + // If you get here a developer forgot to add a type above + logger.error("Unexpected type requested for error saving: {}", badData); + throw new IllegalStateException("Unsupported type for error saving"); + } + + } + + + CheckedFuture future = writeTransaction.submit(); + + try { + future.checkedGet(); + } catch (TransactionCommitFailedException e) { + logger.error("Failed to write errors to operational datastore", e); + } + } + + /** + * Removes Ccaps if all Ccap instances are removed + */ + private class CcapsCleaner extends AbstractCleaner { + + public CcapsCleaner(final InstanceIdentifier removedIID) { + super(removedIID, Ccaps.class, LogicalDatastoreType.OPERATIONAL); + } + + @Override + protected boolean shouldClean(final Ccaps ccaps) { + return ccaps.getCcap().isEmpty(); + } + } + + /** - * Implemented from the DataChangeListener interface. + * Removes Subscriber if all Gate instances are removed */ + private class SubscriberCleaner extends AbstractCleaner { + + public SubscriberCleaner(InstanceIdentifier removedGateIID) { + super(removedGateIID, Subscriber.class, LogicalDatastoreType.OPERATIONAL); + } + + @Override + protected boolean shouldClean(final Subscriber subscriber) { + return subscriber.getGates().getGate().isEmpty(); + } + + @Override + protected void postRemove(InstanceIdentifier subscriberIID) { + executor.execute(new AppCleaner(subscriberIID)); + } + } + + + /** + * Removes App if all Subscribers are removed. + */ + private class AppCleaner extends AbstractCleaner { + + public AppCleaner(InstanceIdentifier removedSubscriberIID) { + super(removedSubscriberIID, App.class, LogicalDatastoreType.OPERATIONAL); + } + + @Override + boolean shouldClean(final App app) { + return app.getSubscribers().getSubscriber().isEmpty(); + } + + @Override + void postRemove(final InstanceIdentifier appIID) { + //unregister app rpc path + logger.info("Un-Registering App Routed RPC Path..."); + rpcRegistration.unregisterPath(AppContext.class, appIID); + executor.execute(new AppsCleaner(appIID)); + } + } + + + /** + * Removes Apps if all App instances are removed. + */ + private class AppsCleaner extends AbstractCleaner { + + public AppsCleaner(InstanceIdentifier removedAppIID) { + super(removedAppIID, Apps.class, LogicalDatastoreType.OPERATIONAL); + } + + @Override + protected boolean shouldClean(final Apps apps) { + return apps.getApp().isEmpty(); + } + } + + + /** + * Helper class to do the heavy lifting in removing object. Lets subclasses decide with + * {@link #shouldClean(DataObject)}.
+ *

+ * Subclasses can react after an instance is removed by overriding {@link #postRemove(InstanceIdentifier)} + * + * @param + * The type that will be removed + */ + private abstract class AbstractCleaner implements Runnable { + final InstanceIdentifier removedIID; + final Class tClass; + final LogicalDatastoreType datastoreType; + + public AbstractCleaner(InstanceIdentifier removedIID, Class tClass, LogicalDatastoreType datastoreType) { + this.removedIID = checkNotNull(removedIID); + this.tClass = checkNotNull(tClass); + this.datastoreType = checkNotNull(datastoreType); + } + + @Override + public void run() { + InstanceIdentifier tIID = removedIID.firstIdentifierOf(tClass); + if (tIID != null) { + Optional optional = mdsalUtils.read(datastoreType, tIID); + if (optional.isPresent()) { + + if (shouldClean(optional.get())) { + if (mdsalUtils.delete(datastoreType, tIID)) { + postRemove(tIID); + } else { + removeFailed(tIID); + } + } - private class InstanceData { - // CCAP Identity - public final Map, Ccaps> ccapIidMap = new HashMap<>(); - // Gate Identity - public String subId; - public final Map gatePathMap = new HashMap<>(); - public String gatePath; - public final Map, Gates> gateIidMap = new HashMap<>(); - // remove path for either CCAP or Gates - public final Set removePathList = new HashSet<>(); - - public InstanceData(final Map, DataObject> thisData) { - // only used to parse createdData or updatedData - getCcaps(thisData); - if (ccapIidMap.isEmpty()) { - getGates(thisData); - if (! gateIidMap.isEmpty()){ - gatePath = gatePathMap.get("appId") + "/" + gatePathMap.get("subId"); } + } else { + logger.error("Expected to find InstanceIdentifier<{}> but was not found: {}", tClass.getSimpleName(), + removedIID); } } - public InstanceData(final Set> thisData) { - // only used to parse the removedData paths - for (final InstanceIdentifier removeThis : thisData) { - getGatePathMap(removeThis); - if (gatePathMap.containsKey("ccapId")) { - gatePath = gatePathMap.get("ccapId"); - removePathList.add(gatePath); - } else if (gatePathMap.containsKey("gateId")) { - gatePath = gatePathMap.get("appId") + "/" + gatePathMap.get("subId") + "/" + gatePathMap.get("gateId"); - removePathList.add(gatePath); + /** + * If returns true the object will be removed from the datastore + * + * @param object + * The object that might be removed. + * @return true if it should be removed. + */ + abstract boolean shouldClean(final T object); + + /** + * Called after an instance is removed. + * + * @param tIID + * the InstanceIdentifier of the removed object + */ + void postRemove(InstanceIdentifier tIID) { + + } + + void removeFailed(InstanceIdentifier tIID) { + logger.error("Failed to remove {}", tIID); + } + } + + + /** + * Listener for the packetcable:ccaps tree + */ + private class CcapsDataChangeListener extends AbstractDataChangeListener { + + private final DataValidator ccapsDataValidator = new DataValidator(new CcapsValidatorProviderFactory().build()); + + private final Set> updateQueue = Sets.newConcurrentHashSet(); + + public CcapsDataChangeListener() { + super(Ccap.class); + } + + @Override + protected void handleCreatedData(final Map, Ccap> createdCcaps) { + if (createdCcaps.isEmpty()) { + return; + } + + final Map, ValidationException> errorMap = + ccapsDataValidator.validateOneType(createdCcaps, Validator.Extent.NODE_AND_SUBTREE); + + // validate all new objects an update operational datastore + if (!errorMap.isEmpty()) { + // bad data write errors to operational datastore + saveErrors(errorMap, createdCcaps); + } + + if (createdCcaps.size() > errorMap.size()) { + final Map, Ccap> goodData = + Maps.newHashMapWithExpectedSize(createdCcaps.size() - errorMap.size()); + for (InstanceIdentifier iid : createdCcaps.keySet()) { + if (!errorMap.containsKey(iid)) { + goodData.put(iid, createdCcaps.get(iid)); + } } + addNewCcaps(goodData); } } - private void getGatePathMap(final InstanceIdentifier thisInstance) { - logger.info("onDataChanged().getGatePathMap(): " + thisInstance); - try { - final InstanceIdentifier ccapInstance = thisInstance.firstIdentifierOf(Ccaps.class); - if (ccapInstance != null) { - final CcapsKey ccapKey = InstanceIdentifier.keyOf(ccapInstance); - if (ccapKey != null) { - gatePathMap.put("ccapId", ccapKey.getCcapId()); - } + + private void addNewCcaps(final Map, Ccap> goodData) { + for (InstanceIdentifier iid : goodData.keySet()) { + final Ccap ccap = goodData.get(iid); + + // add service + if (pcmmServiceMap.containsKey(ccap.getCcapId())) { + logger.error("Already monitoring CCAP - " + ccap); + continue; + } + final PCMMService pcmmService = new PCMMService(IPCMMClient.CLIENT_TYPE, ccap); + // TODO - may want to use the AMID but for the client type but probably not??? +/* + final PCMMService pcmmService = new PCMMService( + thisCcap.getAmId().getAmType().shortValue(), thisCcap); +*/ + ConnectionBuilder connectionBuilder = new ConnectionBuilder(); + String message = pcmmService.addCcap(); + if (message.contains("200 OK")) { + pcmmServiceMap.put(ccap.getCcapId(), pcmmService); + ccapMap.put(ccap.getCcapId(), ccap); + updateCcapMaps(ccap); + logger.info("Created CCAP: {}/{} : {}", iid, ccap, message); + logger.info("Created CCAP: {} : {}", iid, message); + + connectionBuilder.setConnected(true).setError(Collections.emptyList()); } else { - // get the gate path keys from the InstanceIdentifier Map key set if they are there - final InstanceIdentifier appsInstance = thisInstance.firstIdentifierOf(Apps.class); - if (appsInstance != null) { - final AppsKey appKey = InstanceIdentifier.keyOf(appsInstance); - if (appKey != null) { - gatePathMap.put("appId", appKey.getAppId()); - } - } - final InstanceIdentifier subsInstance = thisInstance.firstIdentifierOf(Subs.class); - if (subsInstance != null) { - final SubsKey subKey = InstanceIdentifier.keyOf(subsInstance); - if (subKey != null) { - subId = subKey.getSubId(); - gatePathMap.put("subId", subId); - } + logger.error("Create CCAP Failed: {} : {}", iid, message); + + connectionBuilder.setConnected(false).setError(Collections.singletonList(message)); + } + + //register rpc + logger.info("Registering CCAP Routed RPC Path..."); + rpcRegistration.registerPath(CcapContext.class, iid); + + Optional optionalCcap = mdsalUtils.read(LogicalDatastoreType.OPERATIONAL, iid); + + final CcapBuilder responseCcapBuilder; + if (optionalCcap.isPresent()) { + responseCcapBuilder = new CcapBuilder(optionalCcap.get()); + } else { + responseCcapBuilder = new CcapBuilder(); + responseCcapBuilder.setCcapId(ccap.getCcapId()); + } + + responseCcapBuilder.setConnection(connectionBuilder.build()); + + mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, iid, responseCcapBuilder.build()); + } + + } + + @Override + protected void handleUpdatedData(final Map, Ccap> updatedCcaps, + final Map, Ccap> originalCcaps) { + + // TODO actually support updates + + // update operation not allowed -- restore the original config object and complain + for (final Map.Entry, Ccap> entry : updatedCcaps.entrySet()) { + if (!originalCcaps.containsKey(entry.getKey())) { + logger.error("No original data found for supposedly updated data: {}", entry.getValue()); + continue; + } + + // If this notification is coming from our modification ignore it. + if (updateQueue.contains(entry.getKey())) { + updateQueue.remove(entry.getKey()); + continue; + } + + final Ccap originalCcap = originalCcaps.get(entry.getKey()); + //final Ccap updatedCcap = entry.getValue(); + + //register rpc + logger.info("Registering CCAP Routed RPC Path..."); + rpcRegistration.registerPath(CcapContext.class, entry.getKey()); + + // restore the original data + updateQueue.add(entry.getKey()); + mdsalUtils.put(LogicalDatastoreType.CONFIGURATION, entry.getKey(), originalCcap); + logger.error("CCAP update not permitted {}", entry.getKey()); + } + } + + @Override + protected void handleRemovedData(final Set> removedCcapPaths, + final Map, Ccap> originalCcaps) { + + for (InstanceIdentifier iid : removedCcapPaths) { + final Ccap nukedCcap = originalCcaps.get(iid); + removeCcapFromAllMaps(nukedCcap); + + //unregister ccap rpc path + logger.info("Un-Registering CCAP Routed RPC Path..."); + rpcRegistration.unregisterPath(CcapContext.class, iid); + + mdsalUtils.delete(LogicalDatastoreType.OPERATIONAL, iid); + + // clean up ccaps level if it is now empty + executor.execute(new CcapsCleaner(iid)); + } + + } + } + + + private class QosDataChangeListener extends AbstractDataChangeListener { + + private final DataValidator qosDataValidator = new DataValidator(new QosValidatorProviderFactory().build()); + private final Set> updateQueue = Sets.newConcurrentHashSet(); + + public QosDataChangeListener() { + super(Gate.class); + } + + @Override + protected void handleCreatedData(final Map, Gate> createdData) { + + final Map, ValidationException> errorMap = + qosDataValidator.validateOneType(createdData, Validator.Extent.NODE_AND_SUBTREE); + + // validate all new objects an update operational datastore + if (!errorMap.isEmpty()) { + // bad data write errors to operational datastore + saveErrors(errorMap, createdData); + } + + if (createdData.size() > errorMap.size()) { + final Map, Gate> goodData = + Maps.newHashMapWithExpectedSize(createdData.size() - errorMap.size()); + for (InstanceIdentifier iid : createdData.keySet()) { + if (!errorMap.containsKey(iid)) { + goodData.put(iid, createdData.get(iid)); } - final InstanceIdentifier gatesInstance = thisInstance.firstIdentifierOf(Gates.class); - if (gatesInstance != null) { - final GatesKey gateKey = InstanceIdentifier.keyOf(gatesInstance); - if (gateKey != null) { - gatePathMap.put("gateId", gateKey.getGateId()); + } + addNewGates(goodData); + } + + } + + private void addNewGates(final Map, Gate> createdGates) { + + for (InstanceIdentifier gateIID : createdGates.keySet()) { + final Gate newGate = createdGates.get(gateIID); + + final String newGatePathStr = makeGatePathString(gateIID); + + // if a new app comes along add RPC registration + final InstanceIdentifier appIID = gateIID.firstIdentifierOf(App.class); + // TBD verify if App ID exists first + + //register appID RPC path + logger.info("Registering App Routed RPC Path..."); + rpcRegistration.registerPath(AppContext.class, appIID); + + final InstanceIdentifier subscriberIID = gateIID.firstIdentifierOf(Subscriber.class); + final SubscriberKey subscriberKey = InstanceIdentifier.keyOf(subscriberIID); + final InetAddress subscriberAddr = getInetAddress(subscriberKey.getSubscriberId()); + if (subscriberAddr == null) { + final String msg = String.format("subscriberId must be a valid ipaddress: %s", + subscriberKey.getSubscriberId()); + logger.error(msg); + saveGateError(gateIID, newGatePathStr, msg); + continue; + } + + final Ccap ccap = findCcapForSubscriberId(subscriberAddr); + if (ccap == null) { + final String msg = String.format("Unable to find Ccap for subscriber %s: @ %s", + subscriberKey.getSubscriberId(), newGatePathStr); + logger.error(msg); + saveGateError(gateIID, newGatePathStr, msg); + continue; + } + + final ServiceClassName scn = newGate.getTrafficProfile().getServiceClassName(); + final ServiceFlowDirection scnDirection = findScnOnCcap(scn, ccap); + if (scnDirection == null) { + final String msg = + String.format("SCN %s not found on CCAP %s for %s", scn, ccap.getCcapId(), newGatePathStr); + logger.error(msg); + saveGateError(gateIID, newGatePathStr, msg); + continue; + } + + final PCMMService pcmmService = pcmmServiceMap.get(ccap.getCcapId()); + if (pcmmService == null) { + final String msg = + String.format("Unable to locate PCMM Service for CCAP: %s ; with subscriber: %s", ccap, + subscriberKey.getSubscriberId()); + logger.error(msg); + saveGateError(gateIID, newGatePathStr, msg); + continue; + } + + PCMMService.GateSendStatus status = + pcmmService.sendGateSet(newGatePathStr, subscriberAddr, newGate, scnDirection); + if (status.didSucceed()) { + gateMap.put(newGatePathStr, newGate); + gateCcapMap.put(newGatePathStr, ccap.getCcapId()); + } + final GateBuilder gateBuilder = new GateBuilder(); + gateBuilder.setGateId(newGate.getGateId()) + .setGatePath(newGatePathStr) + .setCcapId(ccap.getCcapId()) + .setCopsGateId(status.getCopsGateId()) + .setCopsGateState("") + .setTimestamp(getNowTimeStamp()) + .setCopsGateTimeInfo("") + .setCopsGateUsageInfo("") + .setTimestamp(getNowTimeStamp()); + + if (!status.didSucceed()) { + gateBuilder.setError(Collections.singletonList(status.getMessage())); + } else { + PCMMService.GateSendStatus infoStatus = pcmmService.sendGateInfo(newGatePathStr); + + if (infoStatus.didSucceed()) { + gateBuilder.setCopsGateState( + infoStatus.getCopsGateState() + "/" + infoStatus.getCopsGateStateReason()) + .setCopsGateTimeInfo(infoStatus.getCopsGateTimeInfo()) + .setCopsGateUsageInfo(infoStatus.getCopsGateUsageInfo()); + } else { + List errors = new ArrayList<>(2); + + // Keep GateSetErrors + if (gateBuilder.getError() != null) { + errors.addAll(gateBuilder.getError()); } + + errors.add(infoStatus.getMessage()); + gateBuilder.setError(errors); } + } - } catch (ClassCastException err) { - logger.warn("Unexpected exception", err); + + Gate operationalGate = gateBuilder.build(); + + mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, gateIID, operationalGate); + } + + } + + private void saveGateError(@Nonnull final InstanceIdentifier gateIID, @Nonnull final String gatePathStr, + @Nonnull final String error) { + checkNotNull(gateIID); + checkNotNull(error); + + final GateBuilder gateBuilder = new GateBuilder(); + gateBuilder.setGateId(InstanceIdentifier.keyOf(gateIID).getGateId()) + .setGatePath(gatePathStr) + .setCopsGateId("") + .setCopsGateState("N/A"); + + gateBuilder.setError(Collections.singletonList(error)); + + Gate operationalGate = gateBuilder.build(); + + mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, gateIID, operationalGate); } - private void getCcaps(final Map, DataObject> thisData) { - logger.info("onDataChanged().getCcaps(): " + thisData); - for (final Map.Entry, DataObject> entry : thisData.entrySet()) { - if (entry.getValue() instanceof Ccaps) { - ccapIidMap.put((InstanceIdentifier)entry.getKey(), (Ccaps)entry.getValue()); + @Override + protected void handleUpdatedData(final Map, Gate> updatedData, + final Map, Gate> originalData) { + // TODO actually support updates + + // update operation not allowed -- restore the original config object and complain + for (final Map.Entry, Gate> entry : updatedData.entrySet()) { + if (!originalData.containsKey(entry.getKey())) { + logger.error("No original data found for supposedly updated data: {}", entry.getValue()); + continue; + } + + // If this notification is coming from our modification ignore it. + if (updateQueue.contains(entry.getKey())) { + updateQueue.remove(entry.getKey()); + continue; } + + final Gate originalGate = originalData.get(entry.getKey()); + + // restores the original data + updateQueue.add(entry.getKey()); + mdsalUtils.put(LogicalDatastoreType.CONFIGURATION, entry.getKey(), originalGate); + logger.error("Update not permitted {}", entry.getKey()); + } } - private void getGates(final Map, DataObject> thisData) { - logger.info("onDataChanged().getGates(): " + thisData); - for (final Map.Entry, DataObject> entry : thisData.entrySet()) { - if (entry.getValue() instanceof Gates) { - final Gates gate = (Gates)entry.getValue(); - final InstanceIdentifier gateIID = (InstanceIdentifier)entry.getKey(); - getGatePathMap(gateIID); - gateIidMap.put(gateIID, gate); + + + @Override + protected void handleRemovedData(final Set> removedPaths, + final Map, Gate> originalData) { + + for (final InstanceIdentifier removedGateIID : removedPaths) { + + mdsalUtils.delete(LogicalDatastoreType.OPERATIONAL, removedGateIID); + + executor.execute(new SubscriberCleaner(removedGateIID)); + + final String gatePathStr = makeGatePathString(removedGateIID); + + if (gateMap.containsKey(gatePathStr)) { + final Gate thisGate = gateMap.remove(gatePathStr); + final String gateId = thisGate.getGateId(); + final String ccapId = gateCcapMap.remove(gatePathStr); + final Ccap thisCcap = ccapMap.get(ccapId); + final PCMMService service = pcmmServiceMap.get(thisCcap.getCcapId()); + if (service != null) { + service.sendGateDelete(gatePathStr); + logger.info("onDataChanged(): removed QoS gate {} for {}/{}/{}: ", gateId, ccapId, gatePathStr, + thisGate); + } else { + logger.warn("Unable to send to locate PCMMService to send gate delete message with CCAP - " + + thisCcap); + } } + + } + + } + + private String makeGatePathString(InstanceIdentifier iid) { + final InstanceIdentifier appIID = iid.firstIdentifierOf(App.class); + final AppKey appKey = InstanceIdentifier.keyOf(appIID); + + final InstanceIdentifier subscriberIID = iid.firstIdentifierOf(Subscriber.class); + final SubscriberKey subscriberKey = InstanceIdentifier.keyOf(subscriberIID); + + final GateKey gateKey = InstanceIdentifier.keyOf(iid); + + return appKey.getAppId() + "/" + subscriberKey.getSubscriberId() + "/" + gateKey.getGateId(); } } + @Override - public void onDataChanged(final AsyncDataChangeEvent, DataObject> change) { - logger.info("onDataChanged"); - // Determine what change action took place by looking at the change object's InstanceIdentifier sets - // and validate all instance data - if (!change.getCreatedData().isEmpty()) { - if (!new ValidateInstanceData(dataBroker, change.getCreatedData()).validateYang()) { - // leave now -- a bad yang object has been detected and a response object has been inserted - return; + public Future> ccapSetConnection(CcapSetConnectionInput input) { + // TODO refactor this method into smaller parts + + InstanceIdentifier ccapIid = (InstanceIdentifier) input.getCcapId(); + List outputError = new ArrayList(); + String rpcResponse = null; + Boolean inputIsConnected = input.getConnection().isConnected(); + Boolean effectiveIsConnected = null; + String ccapId = input.getCcapId().firstIdentifierOf(Ccap.class).firstKeyOf(Ccap.class).getCcapId(); + PCMMService pcmmService = pcmmServiceMap.get(ccapId); + + if (!inputIsConnected) { + // set connected false + if (pcmmService.getPcmmPdpSocket()) { + outputError.add(ccapId + ": CCAP COPS socket is already closed"); + effectiveIsConnected = false; + } else { + //if (!pcmmService.getPcmmCcapClientIsConnected()) { + outputError.add(ccapId + ": CCAP client is disconnected with error: " + + pcmmService.getPcmmCcapClientConnectErrMsg()); + //} + pcmmService.ccapClient.disconnect(); + effectiveIsConnected = false; } - onCreate(new InstanceData(change.getCreatedData())); - } else if (!change.getRemovedPaths().isEmpty()) { - onRemove(new InstanceData(change.getRemovedPaths())); - } else if (!change.getUpdatedData().isEmpty()) { - if (new ValidateInstanceData(dataBroker, change.getUpdatedData()).isResponseEcho()) { - // leave now -- this is an echo of the inserted response object - return; + } else { + // set connected true + if (!pcmmService.getPcmmPdpSocket() && pcmmService.getPcmmCcapClientIsConnected()) { + outputError.add(ccapId + ": CCAP COPS socket is already open"); + outputError.add(ccapId + ": CCAP client is connected"); + effectiveIsConnected = true; + } else { + if (pcmmService.getPcmmCcapClientIsConnected()) { + pcmmService.ccapClient.disconnect(); + } + pcmmService.ccapClient.connect(); + if (pcmmService.getPcmmCcapClientIsConnected()) { + effectiveIsConnected = true; + outputError.add(ccapId + ": CCAP client is connected"); + } else { + effectiveIsConnected = false; + outputError.add(ccapId + ": CCAP client is disconnected with error: " + + pcmmService.getPcmmCcapClientConnectErrMsg()); + } } - onUpdate(new InstanceData(change.getUpdatedData())); + } + + DateAndTime connectionDateAndTime = getNowTimeStamp(); + org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccap.set.connection.output.ccap.ConnectionBuilder + connectionRpcOutput = + new org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccap.set.connection.output.ccap.ConnectionBuilder() + .setConnected(effectiveIsConnected) + .setError(outputError) + .setTimestamp(connectionDateAndTime); + + org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccap.set.connection.output.CcapBuilder ccapRpcOutput = + new org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccap.set.connection.output.CcapBuilder().setCcapId( + ccapId).setConnection(connectionRpcOutput.build()); + + + ConnectionBuilder connectionOps = new ConnectionBuilder().setConnected(effectiveIsConnected) + .setError(outputError) + .setTimestamp(connectionDateAndTime); + + CcapBuilder responseCcapBuilder = new CcapBuilder().setCcapId(ccapId).setConnection(connectionOps.build()); + + + mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, ccapIid, responseCcapBuilder.build()); + + + DateAndTime rpcDateAndTime = getNowTimeStamp(); + rpcResponse = ccapId + ": CCAP set complete"; + CcapSetConnectionOutputBuilder outputBuilder = + new CcapSetConnectionOutputBuilder().setCcap(ccapRpcOutput.build()) + .setResponse(rpcResponse) + .setTimestamp(rpcDateAndTime); + + return Futures.immediateFuture(RpcResultBuilder.success(outputBuilder.build()).build()); + } + + + + @Override + public Future> ccapPollConnection(CcapPollConnectionInput input) { + // TODO refactor this method into smaller parts + + InstanceIdentifier ccapIid = (InstanceIdentifier) input.getCcapId(); + List outputError = new ArrayList(); + + String ccapId = input.getCcapId().firstIdentifierOf(Ccap.class).firstKeyOf(Ccap.class).getCcapId(); + PCMMService pcmmService = pcmmServiceMap.get(ccapId); + Boolean effectiveIsConnected = true; + String response = null; + org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccap.poll.connection.output.ccap.ConnectionBuilder + connectionRpcOutput = + new org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccap.poll.connection.output.ccap.ConnectionBuilder(); + + if (pcmmService != null) { + if (pcmmService.getPcmmPdpSocket()) { + outputError.add(ccapId + ": CCAP Cops socket is closed"); + if (!pcmmService.getPcmmCcapClientIsConnected()) { + outputError.add(ccapId + ": CCAP client is disconnected with error: " + + pcmmService.getPcmmCcapClientConnectErrMsg()); + } + effectiveIsConnected = false; + } else { + //outputError.add(String.format(ccapId+": CCAP Cops socket is open")); + if (!pcmmService.getPcmmCcapClientIsConnected()) { + outputError.add(ccapId + ": CCAP client is disconnected with error: " + + pcmmService.getPcmmCcapClientConnectErrMsg()); + effectiveIsConnected = false; + } else { + outputError.add(ccapId + ": CCAP client is connected"); + } + } + DateAndTime connectionDateAndTime = getNowTimeStamp(); + + + ConnectionBuilder connectionOps = new ConnectionBuilder().setConnected(effectiveIsConnected) + .setError(outputError) + .setTimestamp(connectionDateAndTime); + + CcapBuilder responseCcapBuilder = new CcapBuilder().setCcapId(ccapId).setConnection(connectionOps.build()); + + connectionRpcOutput = + new org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccap.poll.connection.output.ccap.ConnectionBuilder() + .setConnected(effectiveIsConnected) + .setError(outputError) + .setTimestamp(connectionDateAndTime); + + mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, ccapIid, responseCcapBuilder.build()); + response = ccapId + ": CCAP poll complete"; } else { - // we should not be here -- complain bitterly and return - logger.error("onDataChanged(): Unknown change action: " + change); + //pcmmService is null, do not poll + response = ccapId + ": CCAP connection null; no poll performed"; } + + DateAndTime rpcDateAndTime = getNowTimeStamp(); + + org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccap.poll.connection.output.CcapBuilder ccapRpcOutput = + new org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.ccap.poll.connection.output.CcapBuilder().setCcapId( + ccapId).setConnection(connectionRpcOutput.build()); + + CcapPollConnectionOutputBuilder outputBuilder = + new CcapPollConnectionOutputBuilder().setCcap(ccapRpcOutput.build()) + .setResponse(response) + .setTimestamp(rpcDateAndTime); + + return Futures.immediateFuture(RpcResultBuilder.success(outputBuilder.build()).build()); } - private void onCreate(final InstanceData thisData) { - logger.info("onCreate(): " + thisData); - // get the CCAP parameters - String message; - if (! thisData.ccapIidMap.isEmpty()) { - for (Map.Entry, Ccaps> entry : thisData.ccapIidMap.entrySet()) { - final Ccaps thisCcap = entry.getValue(); - // get the CCAP node identity from the Instance Data - final String ccapId = thisCcap.getCcapId(); - if (pcmmServiceMap.get(thisCcap) == null) { - final PCMMService pcmmService = new PCMMService(IPCMMClient.CLIENT_TYPE, thisCcap); - // TODO - may want to use the AMID but for the client type but probably not??? -/* - final PCMMService pcmmService = new PCMMService( - thisCcap.getAmId().getAmType().shortValue(), thisCcap); -*/ - pcmmServiceMap.put(thisCcap, pcmmService); - message = pcmmService.addCcap(); - if (message.contains("200 OK")) { - ccapMap.put(ccapId, thisCcap); - updateCcapMaps(thisCcap); - logger.info("onDataChanged(): created CCAP: {}/{} : {}", thisData.gatePath, thisCcap, message); - logger.info("onDataChanged(): created CCAP: {} : {}", thisData.gatePath, message); + private App readAppFromOperationalDatastore(InstanceIdentifier appIid) { + Optional optionalApp = mdsalUtils.read(LogicalDatastoreType.OPERATIONAL, appIid); + AppBuilder thisAppBuilder = new AppBuilder(optionalApp.get()); + App thisApp = thisAppBuilder.build(); + logger.info("readAppFromConfigDatastore() retrived App: " + thisApp.getAppId()); + return thisApp; + } + + private Gate readGateFromOperationalDatastore(InstanceIdentifier gateIid) { + Optional optionalGate = mdsalUtils.read(LogicalDatastoreType.OPERATIONAL, gateIid); + if (optionalGate.isPresent()) { + GateBuilder gateBuilder = new GateBuilder(optionalGate.get()); + Gate thisGate = gateBuilder.build(); + return thisGate; + } else { + return null; + } + } + + private Subscriber readSubscriberFromOperationalDatastore(InstanceIdentifier subscriberIid) { + Optional optionalSubscriber = mdsalUtils.read(LogicalDatastoreType.OPERATIONAL, subscriberIid); + if (optionalSubscriber.isPresent()) { + SubscriberBuilder subscriberBuilder = new SubscriberBuilder(optionalSubscriber.get()); + Subscriber thisSubscriber = subscriberBuilder.build(); + return thisSubscriber; + } else { + return null; + } + } + + + + @Override + public Future> qosPollGates(QosPollGatesInput input) { + // TODO refactor this method into smaller parts + + InstanceIdentifier appIid = (InstanceIdentifier) input.getAppId(); + //logger.info("qospollgates appIid : "+appIid.toString()); + App app = readAppFromOperationalDatastore(appIid); + //logger.info("qospollgates app : "+app.toString()); + AppKey appKey = InstanceIdentifier.keyOf(appIid); + String inputSubscriberId = input.getSubscriberId(); + String inputGateId = input.getGateId(); + List gateOutputError = Collections.emptyList(); + String subscriberId = null; + String gateId = null; + String ccapId = null; + String gatePathStr = null; + String opsCopsGateId = null; + Gate opsGate = null; + + String rpcResponse = null; + + org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.qos.poll.gates.output.GateBuilder gateOutputBuilder = + new org.opendaylight.yang.gen.v1.urn.packetcable.rev151101.qos.poll.gates.output.GateBuilder(); + + GateBuilder gateBuilder = new GateBuilder(); + + if (inputSubscriberId != null) { + if (inputGateId != null) { + //Subscriber Id and Gate Id provided, only one gate to be poolled + + //generate the gateiid + InstanceIdentifier gateIid = appIid.builder() + .child(Subscribers.class) + .child(Subscriber.class, new SubscriberKey(inputSubscriberId)) + .child(Gates.class) + .child(Gate.class, new GateKey(inputGateId)) + .build(); + + + opsGate = readGateFromOperationalDatastore(gateIid); + + //does the gate exists in the Operational DS? + if (opsGate == null) { + gatePathStr = appKey.getAppId() + "/" + inputSubscriberId + "/" + inputGateId; + rpcResponse = gatePathStr + ": gate does not exist in the system; gate poll not performed"; + } else { + opsCopsGateId = opsGate.getCopsGateId(); + gatePathStr = opsGate.getGatePath(); + + if ((!Objects.equals(opsCopsGateId, "")) && (!Objects.equals(opsCopsGateId, null))) { + ccapId = findCcapForSubscriberId(getInetAddress(inputSubscriberId)).getCcapId(); + PCMMService pcmmService = pcmmServiceMap.get(ccapId); + //is the CCAP socket open? + if (!pcmmService.getPcmmPdpSocket() && pcmmService.getPcmmCcapClientIsConnected()) { + PCMMService.GateSendStatus status = pcmmService.sendGateInfo(gatePathStr); + DateAndTime gateDateAndTime = getNowTimeStamp(); + //logger.info("qospollgates Gate Status : GateID/"+status.getCopsGateId()); + //logger.info("qospollgates Gate Status : Message/"+status.getMessage()); + //logger.info("qospollgates Gate Status : DidSucceed/"+status.didSucceed()); + gateOutputError = Collections.singletonList(status.getMessage()); + + gateOutputBuilder.setGatePath(gatePathStr) + .setCcapId(ccapId) + .setCopsGateState(status.getCopsGateState() + "/" + status.getCopsGateStateReason()) + .setCopsGateTimeInfo(status.getCopsGateTimeInfo()) + .setCopsGateUsageInfo(status.getCopsGateUsageInfo()) + .setCopsGateId(status.getCopsGateId()) + .setError(gateOutputError) + .setTimestamp(gateDateAndTime); + + gateBuilder.setGateId(inputGateId) + .setGatePath(gatePathStr) + .setCcapId(ccapId) + .setCopsGateState(status.getCopsGateState() + "/" + status.getCopsGateStateReason()) + .setCopsGateTimeInfo(status.getCopsGateTimeInfo()) + .setCopsGateUsageInfo(status.getCopsGateUsageInfo()) + .setCopsGateId(status.getCopsGateId()) + .setError(gateOutputError) + .setTimestamp(gateDateAndTime); + + mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, gateIid, gateBuilder.build()); + rpcResponse = gatePathStr + ": gate poll complete"; + } else { + rpcResponse = + ccapId + ": CCAP socket is down or client disconnected; gate poll not performed"; + } } else { - logger.error("onDataChanged(): create CCAP Failed: {} : {}", thisData.gatePath, message); + rpcResponse = gatePathStr + ": gate not active; gate poll not performed"; } - // set the response string in the config ccap object using a new thread - executor.execute(new Response(dataBroker, entry.getKey(), thisCcap, message)); + } + } else { + //inputGateId is null; pool all gates for the subscriber if the sub exists + + //generate active subIid + InstanceIdentifier subIid = appIid.builder() + .child(Subscribers.class) + .child(Subscriber.class, new SubscriberKey(inputSubscriberId)) + .build(); + //does the subscriber provided exists in the Operational Datastore? + Subscriber sub = readSubscriberFromOperationalDatastore(subIid); + if (sub != null) { + //If Subscriber exsits poll all gates for the subscriber + subscriberId = sub.getSubscriberId(); + List gateList = sub.getGates().getGate(); + for (Gate gate : gateList) { + //generate active gateIid + gateId = gate.getGateId(); + InstanceIdentifier gateIid = + subIid.builder().child(Gates.class).child(Gate.class, new GateKey(gateId)).build(); + + opsGate = readGateFromOperationalDatastore(gateIid); + opsCopsGateId = opsGate.getCopsGateId(); + //generate active gatePathStr + gatePathStr = appKey.getAppId() + "/" + subscriberId + "/" + gateId; + + if ((!Objects.equals(opsCopsGateId, "")) && (!Objects.equals(opsCopsGateId, null))) { + ccapId = findCcapForSubscriberId(getInetAddress(subscriberId)).getCcapId(); + PCMMService pcmmService = pcmmServiceMap.get(ccapId); + //is the CCAP socket open? + if (!pcmmService.getPcmmPdpSocket() && pcmmService.getPcmmCcapClientIsConnected()) { + PCMMService.GateSendStatus status = pcmmService.sendGateInfo(gatePathStr); + DateAndTime gateDateAndTime = getNowTimeStamp(); + + gateBuilder.setGateId(gateId) + .setGatePath(gatePathStr) + .setCcapId(ccapId) + .setCopsGateState( + status.getCopsGateState() + "/" + status.getCopsGateStateReason()) + .setCopsGateTimeInfo(status.getCopsGateTimeInfo()) + .setCopsGateUsageInfo(status.getCopsGateUsageInfo()) + .setCopsGateId(status.getCopsGateId()) + .setError(gateOutputError) + .setTimestamp(gateDateAndTime); + + mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, gateIid, gateBuilder.build()); + } else { + logger.info( + "qospollgates: {}: CCAP Cops socket is down or client disconnected; gate poll not performed", + ccapId); + } + } else { + //TODO define what happens if a gate is not active.. is nothing ok? + logger.info("qospollgates: {}: gate not active; gate poll not performed", gatePathStr); + } + } //for + rpcResponse = inputSubscriberId + "/: subscriber subtree poll in progress"; } else { - logger.error("Already monitoring CCAP - " + thisCcap); - break; + rpcResponse = + inputSubscriberId + "/: subscriber is not defined in the system, gate poll not performed"; } } - } else { - // get the PCMM gate parameters from the ccapId/appId/subId/gateId path in the Maps entry (if new gate) - for (final Map.Entry, Gates> entry : thisData.gateIidMap.entrySet()) { - message = null; - final Gates gate = entry.getValue(); - final String gateId = gate.getGateId(); - final String gatePathStr = thisData.gatePath + "/" + gateId ; - final InetAddress subId = getInetAddress(thisData.subId); - if (subId != null) { - final Ccaps thisCcap = findCcapForSubscriberId(subId); - if (thisCcap != null) { - final String ccapId = thisCcap.getCcapId(); - // verify SCN exists on CCAP and force gateSpec.Direction to align with SCN direction - final ServiceClassName scn = gate.getTrafficProfile().getServiceClassName(); - if (scn != null) { - final ServiceFlowDirection scnDir = findScnOnCcap(scn, thisCcap); - if (scnDir != null) { - if (pcmmServiceMap.get(thisCcap) != null) { - message = pcmmServiceMap.get(thisCcap).sendGateSet(gatePathStr, subId, gate, scnDir); - if (message.contains("200 OK")) { - gateMap.put(gatePathStr, gate); - gateCcapMap.put(gatePathStr, thisCcap.getCcapId()); - logger.info("onDataChanged(): created QoS gate {} for {}/{}/{} - {}", - gateId, ccapId, gatePathStr, gate, message); - logger.info("onDataChanged(): created QoS gate {} for {}/{} - {}", - gateId, ccapId, gatePathStr, message); - } else { - logger.info("onDataChanged(): Unable to create QoS gate {} for {}/{}/{} - {}", - gateId, ccapId, gatePathStr, gate, message); - logger.error("onDataChanged(): Unable to create QoS gate {} for {}/{} - {}", - gateId, ccapId, gatePathStr, message); - } - } else { - logger.error("Unable to locate PCMM Service for CCAP - " + thisCcap); - break; - } + } //inputSubId if + else { + // inputSubId is null + if (inputGateId != null) { + gatePathStr = appKey.getAppId() + "/" + inputSubscriberId + "/" + inputGateId; + rpcResponse = gatePathStr + ": Subscriber ID not provided; gate poll not performed"; + } else { + //poll all gates for the appId + + Subscribers subs = app.getSubscribers(); + + logger.info("qospollgates subscribers: " + subs.toString()); + + List subList = subs.getSubscriber(); + logger.info("qospollgates subList: " + subList.toString()); + for (Subscriber sub : subList) { + + //generate active subIid + subscriberId = sub.getSubscriberId(); + InstanceIdentifier subIid = appIid.builder() + .child(Subscribers.class) + .child(Subscriber.class, new SubscriberKey(subscriberId)) + .build(); + + List gateList = sub.getGates().getGate(); + for (Gate gate : gateList) { + //logger.info("qospollgates active gate: "+gate); + + //generate active gateIid + gateId = gate.getGateId(); + InstanceIdentifier gateIid = + subIid.builder().child(Gates.class).child(Gate.class, new GateKey(gateId)).build(); + + opsGate = readGateFromOperationalDatastore(gateIid); + opsCopsGateId = opsGate.getCopsGateId(); + //generate active gatePathStr + gatePathStr = appKey.getAppId() + "/" + subscriberId + "/" + gateId; + if ((!Objects.equals(opsCopsGateId, "")) && (!Objects.equals(opsCopsGateId, null))) { + ccapId = findCcapForSubscriberId(getInetAddress(subscriberId)).getCcapId(); + PCMMService pcmmService = pcmmServiceMap.get(ccapId); + //is the CCAP socket open? + if (!pcmmService.getPcmmPdpSocket() && pcmmService.getPcmmCcapClientIsConnected()) { + PCMMService.GateSendStatus status = pcmmService.sendGateInfo(gatePathStr); + DateAndTime gateDateAndTime = getNowTimeStamp(); + gateOutputError = Collections.singletonList(status.getMessage()); + + + gateBuilder.setGateId(gateId) + .setGatePath(gatePathStr) + .setCcapId(ccapId) + .setCopsGateState( + status.getCopsGateState() + "/" + status.getCopsGateStateReason()) + .setCopsGateTimeInfo(status.getCopsGateTimeInfo()) + .setCopsGateUsageInfo(status.getCopsGateUsageInfo()) + .setCopsGateId(status.getCopsGateId()) + .setError(gateOutputError) + .setTimestamp(gateDateAndTime); + + mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, gateIid, gateBuilder.build()); } else { - logger.error("PCMMService: sendGateSet(): SCN {} not found on CCAP {} for {}/{}", - scn.getValue(), thisCcap, gatePathStr, gate); - message = String.format("404 Not Found - SCN %s not found on CCAP %s for %s", - scn.getValue(), thisCcap.getCcapId(), gatePathStr); + logger.info( + "qospollgates: {}: CCAP socket is down or client disconnected; gate poll not performed", + ccapId); } + } else { + //TODO define what happens if a gate is not active.. is nothing ok + logger.info("qospollgates: {}: gate not active; gate poll not performed", gatePathStr); } - } else { - final String subIdStr = thisData.subId; - message = String.format("404 Not Found - no CCAP found for subscriber %s in %s", - subIdStr, gatePathStr); - logger.info("onDataChanged(): create QoS gate {} FAILED: no CCAP found for subscriber {}: @ {}/{}", - gateId, subIdStr, gatePathStr, gate); - logger.error("onDataChanged(): create QoS gate {} FAILED: no CCAP found for subscriber {}: @ {}", - gateId, subIdStr, gatePathStr); } - } else { - final String subIdStr = thisData.subId; - message = String.format("400 Bad Request - subId must be a valid IP address for subscriber %s in %s", - subIdStr, gatePathStr); - logger.info("onDataChanged(): create QoS gate {} FAILED: subId must be a valid IP address for subscriber {}: @ {}/{}", - gateId, subIdStr, gatePathStr, gate); - logger.error("onDataChanged(): create QoS gate {} FAILED: subId must be a valid IP address for subscriber {}: @ {}", - gateId, subIdStr, gatePathStr); } - // set the response message in the config gate object using a new thread - executor.execute(new Response(dataBroker, entry.getKey(), gate, message)); + rpcResponse = appKey.getAppId() + "/: gate subtree poll in progress"; } } - } - private void onRemove(final InstanceData thisData) { - logger.info("onRemove(): " + thisData); - for (final String gatePathStr: thisData.removePathList) { - if (gateMap.containsKey(gatePathStr)) { - final Gates thisGate = gateMap.remove(gatePathStr); - final String gateId = thisGate.getGateId(); - final String ccapId = gateCcapMap.remove(gatePathStr); - final Ccaps thisCcap = ccapMap.get(ccapId); - final PCMMService service = pcmmServiceMap.get(thisCcap); - if (service != null) { - service.sendGateDelete(thisCcap, gatePathStr); - logger.info("onDataChanged(): removed QoS gate {} for {}/{}/{}: ", gateId, ccapId, gatePathStr, thisGate); - logger.info("onDataChanged(): removed QoS gate {} for {}/{}: ", gateId, ccapId, gatePathStr); - } else - logger.warn("Unable to send to locate PCMMService to send gate delete message with CCAP - " - + thisCcap); - } - } - for (final String ccapIdStr: thisData.removePathList) { - if (ccapMap.containsKey(ccapIdStr)) { - final Ccaps thisCcap = ccapMap.remove(ccapIdStr); - removeCcapFromAllMaps(thisCcap); - } - } - } + DateAndTime rpcDateAndTime = getNowTimeStamp(); - private void onUpdate(final InstanceData oldData) { - logger.info("onUpdate(): " + oldData); - // update operation not allowed -- restore the original config object and complain - if (! oldData.ccapIidMap.isEmpty()) { - for (final Map.Entry, Ccaps> entry : oldData.ccapIidMap.entrySet()) { - final Ccaps ccap = entry.getValue(); - final String ccapId = ccap.getCcapId(); - String message = String.format("405 Method Not Allowed - %s: CCAP update not permitted (use delete); ", - ccapId); - // push new error message onto existing response - message += ccap.getResponse(); - // set the response message in the config object using a new thread -- also restores the original data - executor.execute(new Response(dataBroker, entry.getKey(), ccap, message)); - logger.error("onDataChanged(): CCAP update not permitted {}/{}", ccapId, ccap); - } - } else { - for (final Map.Entry, Gates> entry : oldData.gateIidMap.entrySet()) { - final Gates gate = entry.getValue(); - final String gatePathStr = oldData.gatePath + "/" + gate.getGateId() ; - String message = String.format("405 Method Not Allowed - %s: QoS Gate update not permitted (use delete); ", gatePathStr); - // push new error message onto existing response - message += gate.getResponse(); - // set the response message in the config object using a new thread -- also restores the original data - executor.execute(new Response(dataBroker, entry.getKey(), gate, message)); - logger.error("onDataChanged(): QoS Gate update not permitted: {}/{}", gatePathStr, gate); - } - } + QosPollGatesOutputBuilder outputBuilder = new QosPollGatesOutputBuilder().setTimestamp(rpcDateAndTime) + .setResponse(rpcResponse) + .setGate(gateOutputBuilder.build()); + return Futures.immediateFuture(RpcResultBuilder.success(outputBuilder.build()).build()); } + private DateAndTime getNowTimeStamp() { + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + return new DateAndTime(dateFormat.format(new Date())); + } }