Bump versions by 0.1.0 for next dev cycle
[vpnservice.git] / mdsalutil / mdsalutil-impl / src / main / java / org / opendaylight / vpnservice / mdsalutil / internal / MDSALManager.java
1 /*
2  * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.vpnservice.mdsalutil.internal;
10
11 import java.math.BigInteger;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18
19 import org.opendaylight.vpnservice.mdsalutil.ActionInfo;
20 import org.opendaylight.vpnservice.mdsalutil.ActionType;
21 import org.opendaylight.vpnservice.mdsalutil.FlowEntity;
22 import org.opendaylight.vpnservice.mdsalutil.GroupEntity;
23 import org.opendaylight.vpnservice.mdsalutil.MDSALUtil;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.opendaylight.vpnservice.mdsalutil.*;
51 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
52 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
53 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
54 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
55 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
56 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
57 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
58
59 import com.google.common.util.concurrent.CheckedFuture;
60 import com.google.common.util.concurrent.FutureCallback;
61 import com.google.common.util.concurrent.Futures;
62
63 public class MDSALManager implements AutoCloseable {
64
65     private static final Logger s_logger = LoggerFactory.getLogger(MDSALManager.class);
66
67     private DataBroker m_dataBroker;
68
69     private PacketProcessingService m_packetProcessingService;
70     private ListenerRegistration<DataChangeListener> groupListenerRegistration;
71     private ListenerRegistration<DataChangeListener> flowListenerRegistration;
72     private ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<FlowInfoKey, Runnable>();
73     private ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<GroupInfoKey, Runnable> ();
74     private ExecutorService executorService = Executors.newSingleThreadExecutor();
75
76     /**
77      * Writes the flows and Groups to the MD SAL DataStore
78      * which will be sent to the openflowplugin for installing flows/groups on the switch.
79      * Other modules of VPN service that wants to install flows / groups on the switch
80      * uses this utility
81      *
82      * @param db - dataBroker reference
83      * @param pktProcService- PacketProcessingService for sending the packet outs
84      */
85     public MDSALManager(final DataBroker db, PacketProcessingService pktProcService) {
86         m_dataBroker = db;
87         m_packetProcessingService = pktProcService;
88         registerListener(db);
89         s_logger.info( "MDSAL Manager Initialized ") ;
90     }
91
92     @Override
93     public void close() throws Exception {
94         groupListenerRegistration.close();
95         flowListenerRegistration.close();
96         s_logger.info("MDSAL Manager Closed");
97     }
98
99     private void registerListener(DataBroker db) {
100         try {
101             flowListenerRegistration = db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, getWildCardFlowPath(),
102                                                                         new FlowListener(),
103                                                                         DataChangeScope.SUBTREE);
104             groupListenerRegistration = db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, getWildCardGroupPath(),
105                                                                         new GroupListener(),
106                                                                         DataChangeScope.SUBTREE);
107         } catch (final Exception e) {
108             s_logger.error("GroupEventHandler: DataChange listener registration fail!", e);
109             throw new IllegalStateException("GroupEventHandler: registration Listener failed.", e);
110         }
111     }
112
113     private InstanceIdentifier<Group> getWildCardGroupPath() {
114         return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).child(Group.class);
115     }
116
117     private InstanceIdentifier<Flow> getWildCardFlowPath() {
118         return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class);
119     }
120
121     public void installFlow(FlowEntity flowEntity) {
122
123         try {
124             s_logger.trace("InstallFlow for flowEntity {} ", flowEntity);
125
126             if (flowEntity.getCookie() == null) {
127                flowEntity.setCookie(new BigInteger("0110000", 16));
128             }
129
130             FlowKey flowKey = new FlowKey( new FlowId(flowEntity.getFlowId()) );
131
132             FlowBuilder flowbld = flowEntity.getFlowBuilder();
133
134             Node nodeDpn = buildDpnNode(flowEntity.getDpnId());
135             InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
136                     .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
137                     .child(Table.class, new TableKey(flowEntity.getTableId())).child(Flow.class,flowKey).build();
138
139             WriteTransaction modification = m_dataBroker.newWriteOnlyTransaction();
140
141             modification.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flowbld.build(),true );
142
143             CheckedFuture<Void,TransactionCommitFailedException> submitFuture  = modification.submit();
144
145             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
146
147                 @Override
148                 public void onSuccess(final Void result) {
149                     // Commited successfully
150                     s_logger.debug( "Install Flow -- Committedsuccessfully ") ;
151                 }
152
153                 @Override
154                 public void onFailure(final Throwable t) {
155                     // Transaction failed
156
157                     if(t instanceof OptimisticLockFailedException) {
158                         // Failed because of concurrent transaction modifying same data
159                         s_logger.error( "Install Flow -- Failed because of concurrent transaction modifying same data ") ;
160                     } else {
161                        // Some other type of TransactionCommitFailedException
162                         s_logger.error( "Install Flow -- Some other type of TransactionCommitFailedException " + t) ;
163                     }
164                 }
165             });
166         } catch (Exception e) {
167             s_logger.error("Could not install flow: {}", flowEntity, e);
168         }
169     }
170
171     public CheckedFuture<Void,TransactionCommitFailedException> installFlow(BigInteger dpId, Flow flow) {
172         FlowKey flowKey = new FlowKey( new FlowId(flow.getId()) );
173         Node nodeDpn = buildDpnNode(dpId);
174         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
175                 .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
176                 .child(Table.class, new TableKey(flow.getTableId())).child(Flow.class,flowKey).build();
177         WriteTransaction modification = m_dataBroker.newWriteOnlyTransaction();
178         modification.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow, true);
179         return modification.submit();
180     }
181
182     public void installGroup(GroupEntity groupEntity) {
183         try {
184             Group group = groupEntity.getGroupBuilder().build();
185
186             Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
187
188             InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
189                     .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
190                     .child(Group.class, new GroupKey(new GroupId(groupEntity.getGroupId()))).build();
191
192             WriteTransaction modification = m_dataBroker.newWriteOnlyTransaction();
193
194             modification.put(LogicalDatastoreType.CONFIGURATION, groupInstanceId, group, true);
195
196             CheckedFuture<Void,TransactionCommitFailedException> submitFuture  = modification.submit();
197
198             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
199                 @Override
200                 public void onSuccess(final Void result) {
201                     // Commited successfully
202                     s_logger.debug( "Install Group -- Committedsuccessfully ") ;
203                 }
204
205                 @Override
206                 public void onFailure(final Throwable t) {
207                     // Transaction failed
208
209                     if(t instanceof OptimisticLockFailedException) {
210                         // Failed because of concurrent transaction modifying same data
211                         s_logger.error( "Install Group -- Failed because of concurrent transaction modifying same data ") ;
212                     } else {
213                        // Some other type of TransactionCommitFailedException
214                         s_logger.error( "Install Group -- Some other type of TransactionCommitFailedException " + t) ;
215                     }
216                 }
217              });
218            } catch (Exception e) {
219             s_logger.error("Could not install Group: {}", groupEntity, e);
220             throw e;
221         }
222     }
223
224     public void removeFlow(FlowEntity flowEntity) {
225         try {
226             s_logger.debug("Remove flow {}",flowEntity);
227             Node nodeDpn = buildDpnNode(flowEntity.getDpnId());
228             FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
229             InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
230                     .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
231                     .child(Table.class, new TableKey(flowEntity.getTableId())).child(Flow.class, flowKey).build();
232
233
234                 WriteTransaction modification = m_dataBroker.newWriteOnlyTransaction();
235                 modification.delete(LogicalDatastoreType.CONFIGURATION,flowInstanceId);
236
237                 CheckedFuture<Void,TransactionCommitFailedException> submitFuture  = modification.submit();
238
239                 Futures.addCallback(submitFuture, new FutureCallback<Void>() {
240                     @Override
241                     public void onSuccess(final Void result) {
242                         // Commited successfully
243                         s_logger.debug( "Delete Flow -- Committedsuccessfully ") ;
244                     }
245
246                     @Override
247                     public void onFailure(final Throwable t) {
248                         // Transaction failed
249                         if(t instanceof OptimisticLockFailedException) {
250                             // Failed because of concurrent transaction modifying same data
251                             s_logger.error( "Delete Flow -- Failed because of concurrent transaction modifying same data ") ;
252                         } else {
253                            // Some other type of TransactionCommitFailedException
254                             s_logger.error( "Delete Flow -- Some other type of TransactionCommitFailedException " + t) ;
255                         }
256                     }
257
258                 });
259         } catch (Exception e) {
260             s_logger.error("Could not remove Flow: {}", flowEntity, e);
261         }
262     }
263
264     public CheckedFuture<Void,TransactionCommitFailedException> removeFlowNew(BigInteger dpnId, Flow flowEntity) {
265         s_logger.debug("Remove flow {}",flowEntity);
266         Node nodeDpn = buildDpnNode(dpnId);
267                 //FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getId()));
268         FlowKey flowKey = new FlowKey(flowEntity.getId());
269         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
270                     .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
271                     .child(Table.class, new TableKey(flowEntity.getTableId())).child(Flow.class, flowKey).build();
272         WriteTransaction  modification = m_dataBroker.newWriteOnlyTransaction();
273         modification.delete(LogicalDatastoreType.CONFIGURATION,flowInstanceId );
274         return modification.submit();
275     }
276
277     public void removeGroup(GroupEntity groupEntity) {
278         try {
279             Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
280             InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
281                     .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
282                     .child(Group.class, new GroupKey(new GroupId(groupEntity.getGroupId()))).build();
283
284             WriteTransaction modification = m_dataBroker.newWriteOnlyTransaction();
285
286             modification.delete(LogicalDatastoreType.CONFIGURATION,groupInstanceId );
287
288             CheckedFuture<Void,TransactionCommitFailedException> submitFuture  = modification.submit();
289
290             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
291                 @Override
292                 public void onSuccess(final Void result) {
293                     // Commited successfully
294                     s_logger.debug( "Install Group -- Committedsuccessfully ") ;
295                 }
296
297                 @Override
298                 public void onFailure(final Throwable t) {
299                     // Transaction failed
300                     if(t instanceof OptimisticLockFailedException) {
301                         // Failed because of concurrent transaction modifying same data
302                         s_logger.error( "Install Group -- Failed because of concurrent transaction modifying same data ") ;
303                     } else {
304                        // Some other type of TransactionCommitFailedException
305                         s_logger.error( "Install Group -- Some other type of TransactionCommitFailedException " + t) ;
306                     }
307                 }
308             });
309         } catch (Exception e) {
310             s_logger.error("Could not remove Group: {}", groupEntity, e);
311         }
312     }
313
314     public void modifyGroup(GroupEntity groupEntity) {
315
316         installGroup(groupEntity);
317     }
318
319     public void sendPacketOut(BigInteger dpnId, int groupId, byte[] payload) {
320
321         List<ActionInfo> actionInfos = new ArrayList<ActionInfo>();
322         actionInfos.add(new ActionInfo(ActionType.group, new String[] { String.valueOf(groupId) }));
323
324         sendPacketOutWithActions(dpnId, groupId, payload, actionInfos);
325     }
326
327     public void sendPacketOutWithActions(BigInteger dpnId, long groupId, byte[] payload, List<ActionInfo> actionInfos) {
328
329         m_packetProcessingService.transmitPacket(MDSALUtil.getPacketOut(actionInfos, payload, dpnId,
330                 getNodeConnRef("openflow:" + dpnId, "0xfffffffd")));
331     }
332
333     public void sendARPPacketOutWithActions(BigInteger dpnId, byte[] payload, List<ActionInfo> actions) {
334         m_packetProcessingService.transmitPacket(MDSALUtil.getPacketOut(actions, payload, dpnId,
335                 getNodeConnRef("openflow:" + dpnId, "0xfffffffd")));
336     }
337
338     public InstanceIdentifier<Node> nodeToInstanceId(Node node) {
339         return InstanceIdentifier.builder(Nodes.class).child(Node.class, node.getKey()).toInstance();
340     }
341
342     private static NodeConnectorRef getNodeConnRef(final String nodeId, final String port) {
343         StringBuilder _stringBuilder = new StringBuilder(nodeId);
344         StringBuilder _append = _stringBuilder.append(":");
345         StringBuilder sBuild = _append.append(port);
346         String _string = sBuild.toString();
347         NodeConnectorId _nodeConnectorId = new NodeConnectorId(_string);
348         NodeConnectorKey _nodeConnectorKey = new NodeConnectorKey(_nodeConnectorId);
349         NodeConnectorKey nConKey = _nodeConnectorKey;
350         InstanceIdentifierBuilder<Nodes> _builder = InstanceIdentifier.<Nodes> builder(Nodes.class);
351         NodeId _nodeId = new NodeId(nodeId);
352         NodeKey _nodeKey = new NodeKey(_nodeId);
353         InstanceIdentifierBuilder<Node> _child = _builder.<Node, NodeKey> child(Node.class, _nodeKey);
354         InstanceIdentifierBuilder<NodeConnector> _child_1 = _child.<NodeConnector, NodeConnectorKey> child(
355                 NodeConnector.class, nConKey);
356         InstanceIdentifier<NodeConnector> path = _child_1.toInstance();
357         NodeConnectorRef _nodeConnectorRef = new NodeConnectorRef(path);
358         return _nodeConnectorRef;
359     }
360
361     private Node buildDpnNode(BigInteger dpnId) {
362         NodeId nodeId = new NodeId("openflow:" + dpnId);
363         Node nodeDpn = new NodeBuilder().setId(nodeId).setKey(new NodeKey(nodeId)).build();
364
365         return nodeDpn;
366     }
367
368     public void syncSetUpFlow(FlowEntity flowEntity, long delay, boolean isRemove) {
369         s_logger.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
370         if (flowEntity.getCookie() == null) {
371             flowEntity.setCookie(new BigInteger("0110000", 16));
372         }
373         Flow flow = flowEntity.getFlowBuilder().build();
374         String flowId = flowEntity.getFlowId();
375         BigInteger dpId = flowEntity.getDpnId();
376         short tableId = flowEntity.getTableId();
377         Match matches = flow.getMatch();
378         FlowKey flowKey = new FlowKey( new FlowId(flowId));
379         Node nodeDpn = buildDpnNode(dpId);
380         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
381                 .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
382                 .child(Table.class, new TableKey(flow.getTableId())).child(Flow.class, flowKey).build();
383         Runnable notifyTask = new NotifyTask();
384         FlowInfoKey flowInfoKey = new FlowInfoKey(dpId, tableId, matches, flowId);
385         synchronized (flowInfoKey.toString().intern()) {
386             flowMap.put(flowInfoKey, notifyTask);
387             if (isRemove) {
388                 MDSALUtil.syncDelete(m_dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
389             } else {
390                 MDSALUtil.syncWrite(m_dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
391             }
392             synchronized (notifyTask) {
393                 try {
394                     notifyTask.wait(delay);
395                 } catch (InterruptedException e){}
396             }
397         }
398     }
399
400     public void syncSetUpGroup(GroupEntity groupEntity, long delayTime, boolean isRemove) {
401         s_logger.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
402         Group group = groupEntity.getGroupBuilder().build();
403         BigInteger dpId = groupEntity.getDpnId();
404         Node nodeDpn = buildDpnNode(dpId);
405         long groupId = groupEntity.getGroupId();
406         GroupKey groupKey = new GroupKey(new GroupId(groupId));
407         InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
408                 .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
409                 .child(Group.class, groupKey).build();
410         Runnable notifyTask = new NotifyTask();
411         GroupInfoKey groupInfoKey = new GroupInfoKey(dpId, groupId);
412         synchronized (groupInfoKey.toString().intern()) {
413             s_logger.trace("syncsetupGroupKey groupKey {}", groupInfoKey);
414             groupMap.put(groupInfoKey, notifyTask);
415             if (isRemove) {
416                 MDSALUtil.syncDelete(m_dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
417             } else {
418                 MDSALUtil.syncWrite(m_dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
419             }
420             synchronized (notifyTask) {
421                 try {
422                     notifyTask.wait(delayTime);
423                 } catch (InterruptedException e){}
424             }
425         }
426     }
427
428     public void syncSetUpGroup(BigInteger dpId, Group group, long delayTime, boolean isRemove) {
429         s_logger.trace("syncSetUpGroup for group {} ", group);
430         Node nodeDpn = buildDpnNode(dpId);
431         long groupId = group.getGroupId().getValue();
432         GroupKey groupKey = new GroupKey(new GroupId(groupId));
433         InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
434                 .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
435                 .child(Group.class, groupKey).build();
436         Runnable notifyTask = new NotifyTask();
437         GroupInfoKey groupInfoKey = new GroupInfoKey(dpId, groupId);
438         synchronized (groupInfoKey.toString().intern()) {
439             s_logger.trace("syncsetupGroupKey groupKey {}", groupInfoKey);
440             groupMap.put(groupInfoKey, notifyTask);
441             if (isRemove) {
442                 MDSALUtil.syncDelete(m_dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
443             } else {
444                 MDSALUtil.syncWrite(m_dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
445             }
446             synchronized (notifyTask) {
447                 try {
448                     notifyTask.wait(delayTime);
449                 } catch (InterruptedException e){}
450             }
451         }
452     }
453
454     class GroupListener extends AbstractDataChangeListener<Group> {
455
456         public GroupListener() {
457             super(Group.class);
458         }
459
460         @Override
461         protected void remove(InstanceIdentifier<Group> identifier, Group del) {
462             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class, NodeKey.class).getId().getValue());
463             executeNotifyTaskIfRequired(dpId, del);
464         }
465
466         private void executeNotifyTaskIfRequired(BigInteger dpId, Group group) {
467             GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue());
468             Runnable notifyTask = groupMap.remove(groupKey);
469             if (notifyTask == null) {
470                 return;
471             }
472             executorService.execute(notifyTask);
473         }
474
475         @Override
476         protected void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
477             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class, NodeKey.class).getId().getValue());
478             executeNotifyTaskIfRequired(dpId, update);
479         }
480
481         @Override
482         protected void add(InstanceIdentifier<Group> identifier, Group add) {
483             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class, NodeKey.class).getId().getValue());
484             executeNotifyTaskIfRequired(dpId, add);
485         }
486     }
487     
488     class FlowListener extends AbstractDataChangeListener<Flow> {
489
490         public FlowListener() {
491             super(Flow.class);
492         }
493
494         @Override
495         protected void remove(InstanceIdentifier<Flow> identifier, Flow del) {
496             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class, NodeKey.class).getId().getValue());
497             notifyTaskIfRequired(dpId, del);
498         }
499
500         private void notifyTaskIfRequired(BigInteger dpId, Flow flow) {
501             FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId(), flow.getMatch(), flow.getId().getValue());
502             Runnable notifyTask = flowMap.remove(flowKey);
503             if (notifyTask == null) {
504                 return;
505             }
506             executorService.execute(notifyTask);
507         }
508
509         @Override
510         protected void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
511         }
512
513         @Override
514         protected void add(InstanceIdentifier<Flow> identifier, Flow add) {
515             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class, NodeKey.class).getId().getValue());
516             notifyTaskIfRequired(dpId, add);
517         }
518     }
519     
520     private BigInteger getDpnFromString(String dpnString) {
521         String[] split = dpnString.split(":");
522         return new BigInteger(split[1]);
523     }
524
525 }