Added response data to operational datastore, refactored data validation
[packetcable.git] / packetcable-policy-server / src / main / java / org / opendaylight / controller / packetcable / provider / PacketcableProvider.java
1 /*
2  * Copyright (c) 2015 CableLabs and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.packetcable.provider;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12
13 import com.google.common.base.Optional;
14 import com.google.common.collect.Maps;
15 import com.google.common.collect.Sets;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import java.net.InetAddress;
18 import java.net.UnknownHostException;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.Executors;
28 import javax.annotation.Nonnull;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
35 import org.opendaylight.controller.packetcable.provider.validation.DataValidator;
36 import org.opendaylight.controller.packetcable.provider.validation.ValidationException;
37 import org.opendaylight.controller.packetcable.provider.validation.Validator;
38 import org.opendaylight.controller.packetcable.provider.validation.impl.CcapsValidatorProviderFactory;
39 import org.opendaylight.controller.packetcable.provider.validation.impl.QosValidatorProviderFactory;
40 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
41 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
42 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpPrefix;
43 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Prefix;
44 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.Ccaps;
45 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.Qos;
46 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.ServiceClassName;
47 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.ServiceFlowDirection;
48 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.ccap.attributes.ConnectionBuilder;
49 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.ccaps.Ccap;
50 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.ccaps.CcapBuilder;
51 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.Apps;
52 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.App;
53 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.AppBuilder;
54 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.AppKey;
55 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.app.Subscribers;
56 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.app.SubscribersBuilder;
57 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.app.subscribers.Subscriber;
58 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.app.subscribers.SubscriberBuilder;
59 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.app.subscribers.SubscriberKey;
60 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.app.subscribers.subscriber.Gates;
61 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.app.subscribers.subscriber.GatesBuilder;
62 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.app.subscribers.subscriber.gates.Gate;
63 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.app.subscribers.subscriber.gates.GateBuilder;
64 import org.opendaylight.yang.gen.v1.urn.packetcable.rev151026.pcmm.qos.gates.apps.app.subscribers.subscriber.gates.GateKey;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.yang.binding.DataObject;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.pcmm.rcd.IPCMMClient;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 /**
73  * Called by ODL framework to start this bundle.
74  * <p>
75  * This class is responsible for processing messages received from ODL's restconf interface.
76  * TODO - Remove some of these state maps and move some of this into the PCMMService
77  */
78 @ThreadSafe
79 public class PacketcableProvider implements BindingAwareProvider, AutoCloseable {
80
81     private static final Logger logger = LoggerFactory.getLogger(PacketcableProvider.class);
82
83     // keys to the /restconf/config/packetcable:ccaps and /restconf/config/packetcable:qos config datastore
84     private static final InstanceIdentifier<Ccaps> ccapsIID = InstanceIdentifier.builder(Ccaps.class).build();
85     private static final InstanceIdentifier<Qos> qosIID = InstanceIdentifier.builder(Qos.class).build();
86
87     // TODO - Revisit these maps and remove the ones no longer necessary
88     private final Map<String, Ccap> ccapMap = new ConcurrentHashMap<>();
89     private final Map<String, Gate> gateMap = new ConcurrentHashMap<>();
90     private final Map<String, String> gateCcapMap = new ConcurrentHashMap<>();
91     private final Map<Subnet, Ccap> subscriberSubnetsMap = new ConcurrentHashMap<>();
92     private final Map<ServiceClassName, List<Ccap>> downstreamScnMap = new ConcurrentHashMap<>();
93     private final Map<ServiceClassName, List<Ccap>> upstreamScnMap = new ConcurrentHashMap<>();
94
95     private final Executor executor = Executors.newSingleThreadExecutor();
96
97     /**
98      * Holds a PCMMService object for each CCAP being managed.
99      */
100     private final Map<String, PCMMService> pcmmServiceMap = new ConcurrentHashMap<>();
101
102     /**
103      * The ODL object used to broker messages throughout the framework
104      */
105     private DataBroker dataBroker;
106     private MdsalUtils mdsalUtils;
107
108     // Data change listeners/registrations
109     private final CcapsDataChangeListener ccapsDataChangeListener = new CcapsDataChangeListener();
110     private final QosDataChangeListener qosDataChangeListener = new QosDataChangeListener();
111
112     private ListenerRegistration<DataChangeListener> ccapsDataChangeListenerRegistration;
113     private ListenerRegistration<DataChangeListener> qosDataChangeListenerRegistration;
114
115     /**
116      * Constructor
117      */
118     public PacketcableProvider() {
119         logger.info("Starting provider");
120     }
121
122     @Override
123     public void onSessionInitiated(ProviderContext session) {
124         logger.info("Packetcable Session Initiated");
125         logger.info("logging levels: error={}, warn={}, info={}, debug={}, trace={}", logger.isErrorEnabled(),
126                 logger.isWarnEnabled(), logger.isInfoEnabled(), logger.isDebugEnabled(), logger.isTraceEnabled());
127
128         dataBroker = session.getSALService(DataBroker.class);
129
130         mdsalUtils = new MdsalUtils(dataBroker);
131
132         ccapsDataChangeListenerRegistration = dataBroker
133                 .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, ccapsIID.child(Ccap.class),
134                         ccapsDataChangeListener, DataBroker.DataChangeScope.SUBTREE);
135
136         qosDataChangeListenerRegistration = dataBroker
137                 .registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, PacketcableProvider.qosIID.child(Apps.class).child(App.class),
138                         qosDataChangeListener, DataBroker.DataChangeScope.SUBTREE);
139
140         // Add empty top level elements
141 //        for (LogicalDatastoreType datastoreType : LogicalDatastoreType.values()) {
142 //            WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
143 //            writeTransaction.put(datastoreType, ccapsIID, new CcapsBuilder().build());
144 //            CheckedFuture<Void, TransactionCommitFailedException> future = writeTransaction.submit();
145 //            try {
146 //                future.checkedGet();
147 //            } catch (TransactionCommitFailedException e) {
148 //                logger.error("Failed to initialise top level ccaps in datastore {}", datastoreType, e);
149 //            }
150 //            writeTransaction = dataBroker.newWriteOnlyTransaction();
151 //            writeTransaction.put(datastoreType, qosIID, new QosBuilder().build());
152 //            future = writeTransaction.submit();
153 //            try {
154 //                future.checkedGet();
155 //            } catch (TransactionCommitFailedException e) {
156 //                logger.error("Failed to initialise top level qos in datastore {}", datastoreType, e);
157 //            }
158 //        }
159
160
161     }
162
163     /**
164      * Implemented from the AutoCloseable interface.
165      */
166     @Override
167     public void close() throws ExecutionException, InterruptedException {
168         if (ccapsDataChangeListenerRegistration != null) {
169             ccapsDataChangeListenerRegistration.close();
170         }
171
172         if (qosDataChangeListenerRegistration != null) {
173             qosDataChangeListenerRegistration.close();
174         }
175     }
176
177     private void updateCcapMaps(final Ccap ccap) {
178         // add ccap to the subscriberSubnets map
179         for (final IpPrefix ipPrefix : ccap.getSubscriberSubnets()) {
180             try {
181                 subscriberSubnetsMap.put(Subnet.createInstance(getIpPrefixStr(ipPrefix)), ccap);
182             } catch (UnknownHostException e) {
183                 logger.error("updateSubscriberSubnets: {}:{} FAILED: {}", ipPrefix, ccap, e.getMessage());
184             }
185         }
186         // ccap to upstream SCN map
187         for (final ServiceClassName scn : ccap.getUpstreamScns()) {
188             if (upstreamScnMap.containsKey(scn)) {
189                 upstreamScnMap.get(scn).add(ccap);
190             } else {
191                 final List<Ccap> ccapList = new ArrayList<>();
192                 ccapList.add(ccap);
193                 upstreamScnMap.put(scn, ccapList);
194             }
195         }
196         // ccap to downstream SCN map
197         for (final ServiceClassName scn : ccap.getDownstreamScns()) {
198             if (downstreamScnMap.containsKey(scn)) {
199                 downstreamScnMap.get(scn).add(ccap);
200             } else {
201                 final List<Ccap> ccapList = new ArrayList<>();
202                 ccapList.add(ccap);
203                 downstreamScnMap.put(scn, ccapList);
204             }
205         }
206     }
207
208     private String getIpPrefixStr(final IpPrefix ipPrefix) {
209         final Ipv4Prefix ipv4 = ipPrefix.getIpv4Prefix();
210         if (ipv4 != null) {
211             return ipv4.getValue();
212         } else {
213             return ipPrefix.getIpv6Prefix().getValue();
214         }
215     }
216
217     public InetAddress getInetAddress(final String subId) {
218         try {
219             return InetAddress.getByName(subId);
220         } catch (UnknownHostException e) {
221             logger.error("getInetAddress: {} FAILED: {}", subId, e.getMessage());
222             return null;
223         }
224     }
225
226     private Ccap findCcapForSubscriberId(final InetAddress inetAddr) {
227         Ccap matchedCcap = null;
228         int longestPrefixLen = -1;
229         for (final Map.Entry<Subnet, Ccap> entry : subscriberSubnetsMap.entrySet()) {
230             final Subnet subnet = entry.getKey();
231             if (subnet.isInNet(inetAddr)) {
232                 int prefixLen = subnet.getPrefixLen();
233                 if (prefixLen > longestPrefixLen) {
234                     matchedCcap = entry.getValue();
235                     longestPrefixLen = prefixLen;
236                 }
237             }
238         }
239         return matchedCcap;
240     }
241
242     private ServiceFlowDirection findScnOnCcap(final ServiceClassName scn, final Ccap ccap) {
243         checkNotNull(scn);
244         checkNotNull(ccap);
245
246         if (upstreamScnMap.containsKey(scn)) {
247             final List<Ccap> ccapList = upstreamScnMap.get(scn);
248             if (ccapList.contains(ccap)) {
249                 return ServiceFlowDirection.Us;
250             }
251         } else if (downstreamScnMap.containsKey(scn)) {
252             final List<Ccap> ccapList = downstreamScnMap.get(scn);
253             if (ccapList.contains(ccap)) {
254                 return ServiceFlowDirection.Ds;
255             }
256         }
257         return null;
258     }
259
260     private void removeCcapFromAllMaps(final Ccap ccap) {
261         // remove the ccap from all maps
262         // subscriberSubnets map
263         for (final Map.Entry<Subnet, Ccap> entry : subscriberSubnetsMap.entrySet()) {
264             if (entry.getValue() == ccap) {
265                 subscriberSubnetsMap.remove(entry.getKey());
266             }
267         }
268         // ccap to upstream SCN map
269         for (final Map.Entry<ServiceClassName, List<Ccap>> entry : upstreamScnMap.entrySet()) {
270             final List<Ccap> ccapList = entry.getValue();
271             ccapList.remove(ccap);
272             if (ccapList.isEmpty()) {
273                 upstreamScnMap.remove(entry.getKey());
274             }
275         }
276         // ccap to downstream SCN map
277         for (final Map.Entry<ServiceClassName, List<Ccap>> entry : downstreamScnMap.entrySet()) {
278             final List<Ccap> ccapList = entry.getValue();
279             ccapList.remove(ccap);
280             if (ccapList.isEmpty()) {
281                 downstreamScnMap.remove(entry.getKey());
282             }
283         }
284
285         final PCMMService service = pcmmServiceMap.remove(ccap.getCcapId());
286         if (service != null) {
287             service.disconect();
288         }
289     }
290
291     // ValidationException does not need to be thrown again
292     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
293     private <T extends DataObject> void saveErrors(@Nonnull Map<InstanceIdentifier<T>, ValidationException> errorMap,
294             @Nonnull Map<InstanceIdentifier<T>, T> dataMap) {
295
296         final WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
297
298
299         for (InstanceIdentifier<T> iid : errorMap.keySet()) {
300
301             final ValidationException exception = errorMap.get(iid);
302             final T badData = dataMap.get(iid);
303
304             if (!badData.getImplementedInterface().isAssignableFrom(iid.getTargetType())) {
305                 // InstanceIdentifier<T> does not have the same type as the DataObject
306                 logger.error("Bad InstanceIdentifier to DataObject mapping, {} : {}", iid, badData);
307                 continue;
308             }
309
310             if (badData instanceof Ccap) {
311                 final Ccap ccap = (Ccap) badData;
312
313                 final Ccap opperationalCcap =
314                         new CcapBuilder().setCcapId(ccap.getCcapId()).setError(exception.getErrorMessages()).build();
315
316
317                 // type match between iid and badData is done at start of loop
318                 @SuppressWarnings("unchecked")
319                 final InstanceIdentifier<Ccap> ccapIID = (InstanceIdentifier<Ccap>) iid;
320                 writeTransaction.put(LogicalDatastoreType.OPERATIONAL, ccapIID, opperationalCcap);
321             }
322             else if (badData instanceof Gate) {
323                 final Gate gate = (Gate) badData;
324
325                 final Gate operationalGate =
326                         new GateBuilder()
327                         .setGateId(gate.getGateId())
328                         .setError(exception.getErrorMessages())
329                         .build();
330
331                 final Gates operationalGates = new GatesBuilder()
332                         .setGate(Collections.singletonList(operationalGate))
333                         .build();
334
335                 final SubscriberKey subscriberKey = InstanceIdentifier.keyOf(iid.firstIdentifierOf(Subscriber.class));
336                 final Subscriber operationalSubscriber = new SubscriberBuilder()
337                         .setSubscriberId(subscriberKey.getSubscriberId())
338                         .setGates(operationalGates)
339                         .build();
340
341                 final Subscribers operationalSubscribers = new SubscribersBuilder()
342                         .setSubscriber(Collections.singletonList(operationalSubscriber))
343                         .build();
344
345                 final InstanceIdentifier<App> appIID = iid.firstIdentifierOf(App.class);
346                 final AppKey appKey = InstanceIdentifier.keyOf(appIID);
347                 final App operationalApp = new AppBuilder()
348                         .setAppId(appKey.getAppId())
349                         .setSubscribers(operationalSubscribers)
350                         .build();
351
352
353                 writeTransaction.put(LogicalDatastoreType.OPERATIONAL, appIID, operationalApp);
354             }
355             else {
356                 // If you get here a developer forgot to add a type above
357                 logger.error("Unexpected type requested for error saving: {}", badData);
358                 throw new IllegalStateException("Unsupported type for error saving");
359             }
360
361         }
362
363
364         CheckedFuture<Void, TransactionCommitFailedException> future = writeTransaction.submit();
365
366         try {
367             future.checkedGet();
368         } catch (TransactionCommitFailedException e) {
369             logger.error("Failed to write errors to operational datastore", e);
370         }
371     }
372
373     /**
374      * Removes Ccaps if all Ccap instances are removed
375      */
376     private class CcapsCleaner extends AbstractCleaner<Ccaps> {
377
378         public CcapsCleaner(final InstanceIdentifier<?> removedIID) {
379             super(removedIID, Ccaps.class, LogicalDatastoreType.OPERATIONAL);
380         }
381
382         @Override
383         protected boolean shouldClean(final Ccaps ccaps) {
384             return ccaps.getCcap().isEmpty();
385         }
386     }
387
388     /**
389      * Removes Subscriber if all Gate instances are removed
390      */
391     private class SubscriberCleaner extends AbstractCleaner<Subscriber> {
392
393         public SubscriberCleaner(InstanceIdentifier<Gate> removedGateIID) {
394             super(removedGateIID, Subscriber.class, LogicalDatastoreType.OPERATIONAL);
395         }
396
397         @Override
398         protected boolean shouldClean(final Subscriber subscriber) {
399             return subscriber.getGates().getGate().isEmpty();
400         }
401
402         @Override
403         protected void postRemove(InstanceIdentifier<Subscriber> subscriberIID) {
404             executor.execute(new AppCleaner(subscriberIID));
405         }
406     }
407
408
409     /**
410      * Removes App if all Subscribers are removed.
411      */
412     private class AppCleaner extends AbstractCleaner<App> {
413
414         public AppCleaner(InstanceIdentifier<Subscriber> removedSubscriberIID) {
415             super(removedSubscriberIID, App.class, LogicalDatastoreType.OPERATIONAL);
416         }
417
418         @Override
419         boolean shouldClean(final App app) {
420             return app.getSubscribers().getSubscriber().isEmpty();
421         }
422
423         @Override
424         void postRemove(final InstanceIdentifier<App> appIID) {
425             executor.execute(new AppsCleaner(appIID));
426         }
427     }
428
429
430     /**
431      * Removes Apps if all App instances are removed.
432      */
433     private class AppsCleaner extends  AbstractCleaner<Apps> {
434
435         public AppsCleaner(InstanceIdentifier<App> removedAppIID) {
436             super(removedAppIID, Apps.class, LogicalDatastoreType.OPERATIONAL);
437         }
438
439         @Override
440         protected boolean shouldClean(final Apps apps) {
441             return apps.getApp().isEmpty();
442         }
443     }
444
445
446     /**
447      * Helper class to do the heavy lifting in removing object. Lets subclasses decide with
448      *  {@link #shouldClean(DataObject)}. <br>
449      *
450      * Subclasses can react after an instance is removed by overriding {@link #postRemove(InstanceIdentifier)}
451      * @param <T> The type that will be removed
452      */
453     private abstract class AbstractCleaner <T extends DataObject> implements Runnable {
454         final InstanceIdentifier<?> removedIID;
455         final Class<T> tClass;
456         final LogicalDatastoreType datastoreType;
457
458         public AbstractCleaner(InstanceIdentifier<?> removedIID, Class<T> tClass, LogicalDatastoreType datastoreType) {
459             this.removedIID = checkNotNull(removedIID);
460             this.tClass = checkNotNull(tClass);
461             this.datastoreType = checkNotNull(datastoreType);
462         }
463
464         @Override
465         public void run() {
466             InstanceIdentifier<T> tIID = removedIID.firstIdentifierOf(tClass);
467             if (tIID != null) {
468                 Optional<T> optional = mdsalUtils.read(datastoreType, tIID);
469                 if (optional.isPresent()) {
470
471                     if (shouldClean(optional.get())) {
472                         if (mdsalUtils.delete(datastoreType, tIID)) {
473                             postRemove(tIID);
474                         }
475                         else {
476                             removeFailed(tIID);
477                         }
478                     }
479
480                 }
481             }
482             else {
483                 logger.error("Expected to find InstanceIdentifier<{}> but was not found: {}",
484                         tClass.getSimpleName(), removedIID);
485             }
486         }
487
488         /**
489          * If returns true the object will be removed from the datastore
490          * @param object The object that might be removed.
491          * @return true if it should be removed.
492          */
493         abstract boolean shouldClean(final T object);
494
495         /**
496          * Called after an instance is removed.
497          * @param tIID the InstanceIdentifier of the removed object
498          */
499         void postRemove(InstanceIdentifier<T> tIID) {
500
501         }
502
503         void removeFailed(InstanceIdentifier<T> tIID) {
504             logger.error("Failed to remove {}", tIID);
505         }
506     }
507
508
509     /**
510      * Listener for the packetcable:ccaps tree
511      */
512     private class CcapsDataChangeListener extends AbstractDataChangeListener<Ccap> {
513
514         private final DataValidator ccapsDataValidator = new DataValidator(new CcapsValidatorProviderFactory().build());
515
516         private final Set<InstanceIdentifier<Ccap>> updateQueue = Sets.newConcurrentHashSet();
517
518         public CcapsDataChangeListener() {
519             super(Ccap.class);
520         }
521
522         @Override
523         protected void handleCreatedData(final Map<InstanceIdentifier<Ccap>, Ccap> createdCcaps) {
524             if (createdCcaps.isEmpty()) {
525                 return;
526             }
527
528             final Map<InstanceIdentifier<Ccap>, ValidationException> errorMap =
529                     ccapsDataValidator.validateOneType(createdCcaps, Validator.Extent.NODE_AND_SUBTREE);
530
531             // validate all new objects an update operational datastore
532             if (!errorMap.isEmpty()) {
533                 // bad data write errors to operational datastore
534                 saveErrors(errorMap, createdCcaps);
535             }
536
537             if (createdCcaps.size() > errorMap.size()) {
538                 final Map<InstanceIdentifier<Ccap>, Ccap> goodData =
539                         Maps.newHashMapWithExpectedSize(createdCcaps.size() - errorMap.size());
540                 for (InstanceIdentifier<Ccap> iid : createdCcaps.keySet()) {
541                     if (!errorMap.containsKey(iid)) {
542                         goodData.put(iid, createdCcaps.get(iid));
543                     }
544                 }
545                 addNewCcaps(goodData);
546             }
547         }
548
549         private void addNewCcaps(final Map<InstanceIdentifier<Ccap>, Ccap> goodData) {
550             for (InstanceIdentifier<Ccap> iid : goodData.keySet()) {
551                 final Ccap ccap = goodData.get(iid);
552
553                 // add service
554                 if (pcmmServiceMap.containsKey(ccap.getCcapId())) {
555                     logger.error("Already monitoring CCAP - " + ccap);
556                     continue;
557                 }
558                 final PCMMService pcmmService = new PCMMService(IPCMMClient.CLIENT_TYPE, ccap);
559                 // TODO - may want to use the AMID but for the client type but probably not???
560 /*
561                             final PCMMService pcmmService = new PCMMService(
562                                     thisCcap.getAmId().getAmType().shortValue(), thisCcap);
563 */
564                 ConnectionBuilder connectionBuilder = new ConnectionBuilder();
565                 String message = pcmmService.addCcap();
566                 if (message.contains("200 OK")) {
567                     pcmmServiceMap.put(ccap.getCcapId(), pcmmService);
568                     ccapMap.put(ccap.getCcapId(), ccap);
569                     updateCcapMaps(ccap);
570                     logger.info("Created CCAP: {}/{} : {}", iid, ccap, message);
571                     logger.info("Created CCAP: {} : {}", iid, message);
572                     connectionBuilder.setConnected(true).setError(Collections.<String>emptyList());
573                 } else {
574                     logger.error("Create CCAP Failed: {} : {}", iid, message);
575
576                     connectionBuilder.setConnected(false).setError(Collections.singletonList(message));
577                 }
578
579                 Optional<Ccap> optionalCcap = mdsalUtils.read(LogicalDatastoreType.OPERATIONAL, iid);
580
581                 final CcapBuilder responseCcapBuilder;
582                 if (optionalCcap.isPresent()) {
583                     responseCcapBuilder = new CcapBuilder(optionalCcap.get());
584                 } else {
585                     responseCcapBuilder = new CcapBuilder();
586                     responseCcapBuilder.setCcapId(ccap.getCcapId());
587                 }
588
589                 responseCcapBuilder.setConnection(connectionBuilder.build());
590
591                 mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, iid, responseCcapBuilder.build());
592             }
593
594         }
595
596         @Override
597         protected void handleUpdatedData(final Map<InstanceIdentifier<Ccap>, Ccap> updatedCcaps,
598                 final Map<InstanceIdentifier<Ccap>, Ccap> originalCcaps) {
599
600             // TODO actually support updates
601
602             // update operation not allowed -- restore the original config object and complain
603             for (final Map.Entry<InstanceIdentifier<Ccap>, Ccap> entry : updatedCcaps.entrySet()) {
604                 if (!originalCcaps.containsKey(entry.getKey())) {
605                     logger.error("No original data found for supposedly updated data: {}", entry.getValue());
606                     continue;
607                 }
608
609                 // If this notification is coming from our modification ignore it.
610                 if (updateQueue.contains(entry.getKey())) {
611                     updateQueue.remove(entry.getKey());
612                     continue;
613                 }
614
615                 final Ccap originalCcap = originalCcaps.get(entry.getKey());
616                 //final Ccap updatedCcap = entry.getValue();
617
618                 // restore the original data
619                 updateQueue.add(entry.getKey());
620                 mdsalUtils.put(LogicalDatastoreType.CONFIGURATION, entry.getKey(), originalCcap);
621                 logger.error("CCAP update not permitted {}", entry.getKey());
622             }
623         }
624
625         @Override
626         protected void handleRemovedData(final Set<InstanceIdentifier<Ccap>> removedCcapPaths,
627                 final Map<InstanceIdentifier<Ccap>, Ccap> originalCcaps) {
628
629             for (InstanceIdentifier<Ccap> iid : removedCcapPaths) {
630                 final Ccap nukedCcap = originalCcaps.get(iid);
631                 removeCcapFromAllMaps(nukedCcap);
632
633                 mdsalUtils.delete(LogicalDatastoreType.OPERATIONAL, iid);
634
635                 // clean up ccaps level if it is now empty
636                 executor.execute(new CcapsCleaner(iid));
637             }
638
639         }
640     }
641
642
643     private class QosDataChangeListener extends AbstractDataChangeListener<Gate> {
644
645         private final DataValidator qosDataValidator = new DataValidator(new QosValidatorProviderFactory().build());
646         private final Set<InstanceIdentifier<Gate>> updateQueue = Sets.newConcurrentHashSet();
647
648         public QosDataChangeListener() {
649             super(Gate.class);
650         }
651
652         @Override
653         protected void handleCreatedData(final Map<InstanceIdentifier<Gate>, Gate> createdData) {
654
655             final Map<InstanceIdentifier<Gate>, ValidationException> errorMap =
656                     qosDataValidator.validateOneType(createdData, Validator.Extent.NODE_AND_SUBTREE);
657
658             // validate all new objects an update operational datastore
659             if (!errorMap.isEmpty()) {
660                 // bad data write errors to operational datastore
661                 saveErrors(errorMap, createdData);
662             }
663
664             if (createdData.size() > errorMap.size()) {
665                 final Map<InstanceIdentifier<Gate>, Gate> goodData =
666                         Maps.newHashMapWithExpectedSize(createdData.size() - errorMap.size());
667                 for (InstanceIdentifier<Gate> iid : createdData.keySet()) {
668                     if (!errorMap.containsKey(iid)) {
669                         goodData.put(iid, createdData.get(iid));
670                     }
671                 }
672                 addNewGates(goodData);
673             }
674
675         }
676
677         private void addNewGates(final Map<InstanceIdentifier<Gate>, Gate> createdGates) {
678
679             for (InstanceIdentifier<Gate> gateIID : createdGates.keySet()) {
680                 final Gate newGate = createdGates.get(gateIID);
681
682                 final String newGatePathStr = makeGatePathString(gateIID);
683
684                 final InstanceIdentifier<Subscriber> subscriberIID = gateIID.firstIdentifierOf(Subscriber.class);
685                 final SubscriberKey subscriberKey = InstanceIdentifier.keyOf(subscriberIID);
686                 final InetAddress subscriberAddr = getInetAddress(subscriberKey.getSubscriberId());
687                 if (subscriberAddr == null) {
688                     final String msg = String.format("subscriberId must be a valid ipaddress: %s",
689                             subscriberKey.getSubscriberId());
690                     logger.error(msg);
691                     saveGateError(gateIID, newGatePathStr, msg);
692                     continue;
693                 }
694
695                 final Ccap ccap = findCcapForSubscriberId(subscriberAddr);
696                 if (ccap == null) {
697                     final String msg = String.format("Unable to find Ccap for subscriber %s: @ %s",
698                             subscriberKey.getSubscriberId(), newGatePathStr);
699                     logger.error(msg);
700                     saveGateError(gateIID, newGatePathStr, msg);
701                     continue;
702                 }
703
704                 final ServiceClassName scn = newGate.getTrafficProfile().getServiceClassName();
705                 final ServiceFlowDirection scnDirection = findScnOnCcap(scn, ccap);
706                 if (scnDirection == null) {
707                     final String msg = String.format("SCN %s not found on CCAP %s for %s",
708                             scn, ccap.getCcapId(), newGatePathStr);
709                     logger.error(msg);
710                     saveGateError(gateIID, newGatePathStr, msg);
711                     continue;
712                 }
713
714                 final PCMMService pcmmService = pcmmServiceMap.get(ccap.getCcapId());
715                 if (pcmmService == null) {
716                     final String msg = String.format("Unable to locate PCMM Service for CCAP: %s ; with subscriber: %s",
717                             ccap, subscriberKey.getSubscriberId());
718                     logger.error(msg);
719                     saveGateError(gateIID, newGatePathStr, msg);
720                     continue;
721                 }
722
723                 PCMMService.GateSetStatus status = pcmmService.sendGateSet(newGatePathStr, subscriberAddr, newGate, scnDirection);
724                 gateMap.put(newGatePathStr, newGate);
725                 gateCcapMap.put(newGatePathStr, ccap.getCcapId());
726
727                 final GateBuilder gateBuilder = new GateBuilder();
728                 gateBuilder.setGateId(newGate.getGateId())
729                         .setGatePath(newGatePathStr)
730                         .setCcapId(ccap.getCcapId())
731                         .setCopsGateId(status.getCopsGateId())
732                         .setCopsState(status.didSucceed() ? "success" : "failure");
733                 if (!status.didSucceed()) {
734                     gateBuilder.setError(Collections.singletonList(status.getMessage()));
735                 }
736
737                 Gate operationalGate = gateBuilder.build();
738
739                 mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, gateIID, operationalGate);
740
741             }
742
743         }
744
745         private void saveGateError(@Nonnull final InstanceIdentifier<Gate> gateIID, @Nonnull final String gatePathStr,
746                 @Nonnull final String error) {
747             checkNotNull(gateIID);
748             checkNotNull(error);
749
750             final GateBuilder gateBuilder = new GateBuilder();
751             gateBuilder.setGateId(InstanceIdentifier.keyOf(gateIID).getGateId())
752                     .setGatePath(gatePathStr)
753                     .setCopsGateId("")
754                     .setCopsState("N/A");
755
756                 gateBuilder.setError(Collections.singletonList(error));
757
758             Gate operationalGate = gateBuilder.build();
759
760             mdsalUtils.put(LogicalDatastoreType.OPERATIONAL, gateIID, operationalGate);
761         }
762
763         @Override
764         protected void handleUpdatedData(final Map<InstanceIdentifier<Gate>, Gate> updatedData,
765                 final Map<InstanceIdentifier<Gate>, Gate> originalData) {
766             // TODO actually support updates
767
768             // update operation not allowed -- restore the original config object and complain
769             for (final Map.Entry<InstanceIdentifier<Gate>, Gate> entry : updatedData.entrySet()) {
770                 if (!originalData.containsKey(entry.getKey())) {
771                     logger.error("No original data found for supposedly updated data: {}", entry.getValue());
772                     continue;
773                 }
774
775                 // If this notification is coming from our modification ignore it.
776                 if (updateQueue.contains(entry.getKey())) {
777                     updateQueue.remove(entry.getKey());
778                     continue;
779                 }
780
781                 final Gate originalGate = originalData.get(entry.getKey());
782
783                 // restores the original data
784                 updateQueue.add(entry.getKey());
785                 mdsalUtils.put(LogicalDatastoreType.CONFIGURATION, entry.getKey(), originalGate);
786                 logger.error("Update not permitted {}", entry.getKey());
787
788             }
789         }
790
791
792
793         @Override
794         protected void handleRemovedData(final Set<InstanceIdentifier<Gate>> removedPaths,
795                 final Map<InstanceIdentifier<Gate>, Gate> originalData) {
796
797             for (final InstanceIdentifier<Gate> removedGateIID : removedPaths) {
798
799                 mdsalUtils.delete(LogicalDatastoreType.OPERATIONAL, removedGateIID);
800                 //TODO check if this was the last gate for this app/subscriber and if so delete them
801
802                 executor.execute(new SubscriberCleaner(removedGateIID));
803
804                 final String gatePathStr = makeGatePathString(removedGateIID);
805
806                     if (gateMap.containsKey(gatePathStr)) {
807                         final Gate thisGate = gateMap.remove(gatePathStr);
808                         final String gateId = thisGate.getGateId();
809                         final String ccapId = gateCcapMap.remove(gatePathStr);
810                         final Ccap thisCcap = ccapMap.get(ccapId);
811                         final PCMMService service = pcmmServiceMap.get(thisCcap.getCcapId());
812                         if (service != null) {
813                             service.sendGateDelete(gatePathStr);
814                             logger.info("onDataChanged(): removed QoS gate {} for {}/{}/{}: ", gateId, ccapId, gatePathStr,
815                                     thisGate);
816                         } else {
817                             logger.warn(
818                                     "Unable to send to locate PCMMService to send gate delete message with CCAP - " + thisCcap);
819                         }
820                     }
821
822
823             }
824
825         }
826
827         private String makeGatePathString(InstanceIdentifier<Gate> iid) {
828             final InstanceIdentifier<App> appIID = iid.firstIdentifierOf(App.class);
829             final AppKey appKey = InstanceIdentifier.keyOf(appIID);
830
831             final InstanceIdentifier<Subscriber> subscriberIID = iid.firstIdentifierOf(Subscriber.class);
832             final SubscriberKey subscriberKey = InstanceIdentifier.keyOf(subscriberIID);
833
834             final GateKey gateKey = InstanceIdentifier.keyOf(iid);
835
836             return appKey.getAppId()
837                     + "/" + subscriberKey.getSubscriberId()
838                     + "/" + gateKey.getGateId();
839         }
840     }
841
842 }