Bug 5308
[vpnservice.git] / mdsalutil / mdsalutil-impl / src / main / java / org / opendaylight / vpnservice / mdsalutil / internal / MDSALManager.java
1 /*
2  * Copyright (c) 2015 - 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.vpnservice.mdsalutil.internal;
10
11 import java.math.BigInteger;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18
19 import org.opendaylight.vpnservice.mdsalutil.ActionInfo;
20 import org.opendaylight.vpnservice.mdsalutil.ActionType;
21 import org.opendaylight.vpnservice.mdsalutil.FlowEntity;
22 import org.opendaylight.vpnservice.mdsalutil.GroupEntity;
23 import org.opendaylight.vpnservice.mdsalutil.MDSALUtil;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.opendaylight.vpnservice.mdsalutil.*;
51 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
52 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
53 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
54 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
55 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
56 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
57 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
58
59 import com.google.common.util.concurrent.CheckedFuture;
60 import com.google.common.util.concurrent.FutureCallback;
61 import com.google.common.util.concurrent.Futures;
62
63 public class MDSALManager implements AutoCloseable {
64
65     private static final Logger s_logger = LoggerFactory.getLogger(MDSALManager.class);
66
67     private DataBroker m_dataBroker;
68
69     private PacketProcessingService m_packetProcessingService;
70     private ListenerRegistration<DataChangeListener> groupListenerRegistration;
71     private ListenerRegistration<DataChangeListener> flowListenerRegistration;
72     private ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<FlowInfoKey, Runnable>();
73     private ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<GroupInfoKey, Runnable> ();
74     private ExecutorService executorService = Executors.newSingleThreadExecutor();
75
76     /**
77      * Writes the flows and Groups to the MD SAL DataStore
78      * which will be sent to the openflowplugin for installing flows/groups on the switch.
79      * Other modules of VPN service that wants to install flows / groups on the switch
80      * uses this utility
81      *
82      * @param db - dataBroker reference
83      * @param pktProcService- PacketProcessingService for sending the packet outs
84      */
85     public MDSALManager(final DataBroker db, PacketProcessingService pktProcService) {
86         m_dataBroker = db;
87         m_packetProcessingService = pktProcService;
88         registerListener(db);
89         s_logger.info( "MDSAL Manager Initialized ") ;
90     }
91
92     @Override
93     public void close() throws Exception {
94         groupListenerRegistration.close();
95         flowListenerRegistration.close();
96         s_logger.info("MDSAL Manager Closed");
97     }
98
99     private void registerListener(DataBroker db) {
100         try {
101             flowListenerRegistration = db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, getWildCardFlowPath(),
102                                                                         new FlowListener(),
103                                                                         DataChangeScope.SUBTREE);
104             groupListenerRegistration = db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, getWildCardGroupPath(),
105                                                                         new GroupListener(),
106                                                                         DataChangeScope.SUBTREE);
107         } catch (final Exception e) {
108             s_logger.error("GroupEventHandler: DataChange listener registration fail!", e);
109             throw new IllegalStateException("GroupEventHandler: registration Listener failed.", e);
110         }
111     }
112
113     private InstanceIdentifier<Group> getWildCardGroupPath() {
114         return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).child(Group.class);
115     }
116
117     private InstanceIdentifier<Flow> getWildCardFlowPath() {
118         return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class);
119     }
120
121     public void installFlow(FlowEntity flowEntity) {
122
123         try {
124             s_logger.trace("InstallFlow for flowEntity {} ", flowEntity);
125
126             if (flowEntity.getCookie() == null) {
127                flowEntity.setCookie(new BigInteger("0110000", 16));
128             }
129
130             FlowKey flowKey = new FlowKey( new FlowId(flowEntity.getFlowId()) );
131
132             FlowBuilder flowbld = flowEntity.getFlowBuilder();
133
134             Node nodeDpn = buildDpnNode(flowEntity.getDpnId());
135             InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
136                     .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
137                     .child(Table.class, new TableKey(flowEntity.getTableId())).child(Flow.class,flowKey).build();
138
139             WriteTransaction modification = m_dataBroker.newWriteOnlyTransaction();
140
141             modification.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flowbld.build(),true );
142
143             CheckedFuture<Void,TransactionCommitFailedException> submitFuture  = modification.submit();
144
145             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
146
147                 @Override
148                 public void onSuccess(final Void result) {
149                     // Commited successfully
150                     s_logger.debug( "Install Flow -- Committedsuccessfully ") ;
151                 }
152
153                 @Override
154                 public void onFailure(final Throwable t) {
155                     // Transaction failed
156
157                     if(t instanceof OptimisticLockFailedException) {
158                         // Failed because of concurrent transaction modifying same data
159                         s_logger.error( "Install Flow -- Failed because of concurrent transaction modifying same data ") ;
160                     } else {
161                        // Some other type of TransactionCommitFailedException
162                         s_logger.error( "Install Flow -- Some other type of TransactionCommitFailedException " + t) ;
163                     }
164                 }
165             });
166         } catch (Exception e) {
167             s_logger.error("Could not install flow: {}", flowEntity, e);
168         }
169     }
170
171     public CheckedFuture<Void,TransactionCommitFailedException> installFlow(BigInteger dpId, Flow flow) {
172         FlowKey flowKey = new FlowKey( new FlowId(flow.getId()) );
173         Node nodeDpn = buildDpnNode(dpId);
174         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
175                 .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
176                 .child(Table.class, new TableKey(flow.getTableId())).child(Flow.class,flowKey).build();
177         WriteTransaction modification = m_dataBroker.newWriteOnlyTransaction();
178         modification.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow, true);
179         return modification.submit();
180     }
181
182     public void installGroup(GroupEntity groupEntity) {
183         try {
184             Group group = groupEntity.getGroupBuilder().build();
185
186             Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
187
188             InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
189                     .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
190                     .child(Group.class, new GroupKey(new GroupId(groupEntity.getGroupId()))).build();
191
192             WriteTransaction modification = m_dataBroker.newWriteOnlyTransaction();
193
194             modification.put(LogicalDatastoreType.CONFIGURATION, groupInstanceId, group, true);
195
196             CheckedFuture<Void,TransactionCommitFailedException> submitFuture  = modification.submit();
197
198             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
199                 @Override
200                 public void onSuccess(final Void result) {
201                     // Commited successfully
202                     s_logger.debug( "Install Group -- Committedsuccessfully ") ;
203                 }
204
205                 @Override
206                 public void onFailure(final Throwable t) {
207                     // Transaction failed
208
209                     if(t instanceof OptimisticLockFailedException) {
210                         // Failed because of concurrent transaction modifying same data
211                         s_logger.error( "Install Group -- Failed because of concurrent transaction modifying same data ") ;
212                     } else {
213                        // Some other type of TransactionCommitFailedException
214                         s_logger.error( "Install Group -- Some other type of TransactionCommitFailedException " + t) ;
215                     }
216                 }
217              });
218            } catch (Exception e) {
219             s_logger.error("Could not install Group: {}", groupEntity, e);
220             throw e;
221         }
222     }
223
224     public void removeFlow(FlowEntity flowEntity) {
225         try {
226             s_logger.debug("Remove flow {}",flowEntity);
227             Node nodeDpn = buildDpnNode(flowEntity.getDpnId());
228             FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
229             InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
230                     .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
231                     .child(Table.class, new TableKey(flowEntity.getTableId())).child(Flow.class, flowKey).build();
232
233
234                 WriteTransaction modification = m_dataBroker.newWriteOnlyTransaction();
235                 modification.delete(LogicalDatastoreType.CONFIGURATION,flowInstanceId);
236
237                 CheckedFuture<Void,TransactionCommitFailedException> submitFuture  = modification.submit();
238
239                 Futures.addCallback(submitFuture, new FutureCallback<Void>() {
240                     @Override
241                     public void onSuccess(final Void result) {
242                         // Commited successfully
243                         s_logger.debug( "Delete Flow -- Committedsuccessfully ") ;
244                     }
245
246                     @Override
247                     public void onFailure(final Throwable t) {
248                         // Transaction failed
249                         if(t instanceof OptimisticLockFailedException) {
250                             // Failed because of concurrent transaction modifying same data
251                             s_logger.error( "Delete Flow -- Failed because of concurrent transaction modifying same data ") ;
252                         } else {
253                            // Some other type of TransactionCommitFailedException
254                             s_logger.error( "Delete Flow -- Some other type of TransactionCommitFailedException " + t) ;
255                         }
256                     }
257
258                 });
259         } catch (Exception e) {
260             s_logger.error("Could not remove Flow: {}", flowEntity, e);
261         }
262     }
263
264     public CheckedFuture<Void,TransactionCommitFailedException> removeFlowNew(BigInteger dpnId, Flow flowEntity) {
265         s_logger.debug("Remove flow {}",flowEntity);
266         Node nodeDpn = buildDpnNode(dpnId);
267         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getId()));
268         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
269                     .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
270                     .child(Table.class, new TableKey(flowEntity.getTableId())).child(Flow.class, flowKey).build();
271         WriteTransaction  modification = m_dataBroker.newWriteOnlyTransaction();
272         modification.delete(LogicalDatastoreType.CONFIGURATION,flowInstanceId );
273         return modification.submit();
274     }
275
276     public void removeGroup(GroupEntity groupEntity) {
277         try {
278             Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
279             InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
280                     .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
281                     .child(Group.class, new GroupKey(new GroupId(groupEntity.getGroupId()))).build();
282
283             WriteTransaction modification = m_dataBroker.newWriteOnlyTransaction();
284
285             modification.delete(LogicalDatastoreType.CONFIGURATION,groupInstanceId );
286
287             CheckedFuture<Void,TransactionCommitFailedException> submitFuture  = modification.submit();
288
289             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
290                 @Override
291                 public void onSuccess(final Void result) {
292                     // Commited successfully
293                     s_logger.debug( "Install Group -- Committedsuccessfully ") ;
294                 }
295
296                 @Override
297                 public void onFailure(final Throwable t) {
298                     // Transaction failed
299                     if(t instanceof OptimisticLockFailedException) {
300                         // Failed because of concurrent transaction modifying same data
301                         s_logger.error( "Install Group -- Failed because of concurrent transaction modifying same data ") ;
302                     } else {
303                        // Some other type of TransactionCommitFailedException
304                         s_logger.error( "Install Group -- Some other type of TransactionCommitFailedException " + t) ;
305                     }
306                 }
307             });
308         } catch (Exception e) {
309             s_logger.error("Could not remove Group: {}", groupEntity, e);
310         }
311     }
312
313     public void modifyGroup(GroupEntity groupEntity) {
314
315         installGroup(groupEntity);
316     }
317
318     public void sendPacketOut(BigInteger dpnId, int groupId, byte[] payload) {
319
320         List<ActionInfo> actionInfos = new ArrayList<ActionInfo>();
321         actionInfos.add(new ActionInfo(ActionType.group, new String[] { String.valueOf(groupId) }));
322
323         sendPacketOutWithActions(dpnId, groupId, payload, actionInfos);
324     }
325
326     public void sendPacketOutWithActions(BigInteger dpnId, long groupId, byte[] payload, List<ActionInfo> actionInfos) {
327
328         m_packetProcessingService.transmitPacket(MDSALUtil.getPacketOut(actionInfos, payload, dpnId,
329                 getNodeConnRef("openflow:" + dpnId, "0xfffffffd")));
330     }
331
332     public void sendARPPacketOutWithActions(BigInteger dpnId, byte[] payload, List<ActionInfo> actions) {
333         m_packetProcessingService.transmitPacket(MDSALUtil.getPacketOut(actions, payload, dpnId,
334                 getNodeConnRef("openflow:" + dpnId, "0xfffffffd")));
335     }
336
337     public InstanceIdentifier<Node> nodeToInstanceId(Node node) {
338         return InstanceIdentifier.builder(Nodes.class).child(Node.class, node.getKey()).toInstance();
339     }
340
341     private static NodeConnectorRef getNodeConnRef(final String nodeId, final String port) {
342         StringBuilder _stringBuilder = new StringBuilder(nodeId);
343         StringBuilder _append = _stringBuilder.append(":");
344         StringBuilder sBuild = _append.append(port);
345         String _string = sBuild.toString();
346         NodeConnectorId _nodeConnectorId = new NodeConnectorId(_string);
347         NodeConnectorKey _nodeConnectorKey = new NodeConnectorKey(_nodeConnectorId);
348         NodeConnectorKey nConKey = _nodeConnectorKey;
349         InstanceIdentifierBuilder<Nodes> _builder = InstanceIdentifier.<Nodes> builder(Nodes.class);
350         NodeId _nodeId = new NodeId(nodeId);
351         NodeKey _nodeKey = new NodeKey(_nodeId);
352         InstanceIdentifierBuilder<Node> _child = _builder.<Node, NodeKey> child(Node.class, _nodeKey);
353         InstanceIdentifierBuilder<NodeConnector> _child_1 = _child.<NodeConnector, NodeConnectorKey> child(
354                 NodeConnector.class, nConKey);
355         InstanceIdentifier<NodeConnector> path = _child_1.toInstance();
356         NodeConnectorRef _nodeConnectorRef = new NodeConnectorRef(path);
357         return _nodeConnectorRef;
358     }
359
360     private Node buildDpnNode(BigInteger dpnId) {
361         NodeId nodeId = new NodeId("openflow:" + dpnId);
362         Node nodeDpn = new NodeBuilder().setId(nodeId).setKey(new NodeKey(nodeId)).build();
363
364         return nodeDpn;
365     }
366
367     public void syncSetUpFlow(FlowEntity flowEntity, long delay, boolean isRemove) {
368         s_logger.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
369         if (flowEntity.getCookie() == null) {
370             flowEntity.setCookie(new BigInteger("0110000", 16));
371         }
372         Flow flow = flowEntity.getFlowBuilder().build();
373         String flowId = flowEntity.getFlowId();
374         BigInteger dpId = flowEntity.getDpnId();
375         short tableId = flowEntity.getTableId();
376         Match matches = flow.getMatch();
377         FlowKey flowKey = new FlowKey( new FlowId(flowId));
378         Node nodeDpn = buildDpnNode(dpId);
379         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
380                 .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
381                 .child(Table.class, new TableKey(flow.getTableId())).child(Flow.class, flowKey).build();
382         Runnable notifyTask = new NotifyTask();
383         FlowInfoKey flowInfoKey = new FlowInfoKey(dpId, tableId, matches, flowId);
384         synchronized (flowInfoKey.toString().intern()) {
385             flowMap.put(flowInfoKey, notifyTask);
386             if (isRemove) {
387                 MDSALUtil.syncDelete(m_dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
388             } else {
389                 MDSALUtil.syncWrite(m_dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
390             }
391             synchronized (notifyTask) {
392                 try {
393                     notifyTask.wait(delay);
394                 } catch (InterruptedException e){}
395             }
396         }
397     }
398
399     public void syncSetUpGroup(GroupEntity groupEntity, long delayTime, boolean isRemove) {
400         s_logger.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
401         Group group = groupEntity.getGroupBuilder().build();
402         BigInteger dpId = groupEntity.getDpnId();
403         Node nodeDpn = buildDpnNode(dpId);
404         long groupId = groupEntity.getGroupId();
405         GroupKey groupKey = new GroupKey(new GroupId(groupId));
406         InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
407                 .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
408                 .child(Group.class, groupKey).build();
409         Runnable notifyTask = new NotifyTask();
410         GroupInfoKey groupInfoKey = new GroupInfoKey(dpId, groupId);
411         synchronized (groupInfoKey.toString().intern()) {
412             s_logger.trace("syncsetupGroupKey groupKey {}", groupInfoKey);
413             groupMap.put(groupInfoKey, notifyTask);
414             if (isRemove) {
415                 MDSALUtil.syncDelete(m_dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
416             } else {
417                 MDSALUtil.syncWrite(m_dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
418             }
419             synchronized (notifyTask) {
420                 try {
421                     notifyTask.wait(delayTime);
422                 } catch (InterruptedException e){}
423             }
424         }
425     }
426
427     public void syncSetUpGroup(BigInteger dpId, Group group, long delayTime, boolean isRemove) {
428         s_logger.trace("syncSetUpGroup for group {} ", group);
429         Node nodeDpn = buildDpnNode(dpId);
430         long groupId = group.getGroupId().getValue();
431         GroupKey groupKey = new GroupKey(new GroupId(groupId));
432         InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
433                 .child(Node.class, nodeDpn.getKey()).augmentation(FlowCapableNode.class)
434                 .child(Group.class, groupKey).build();
435         Runnable notifyTask = new NotifyTask();
436         GroupInfoKey groupInfoKey = new GroupInfoKey(dpId, groupId);
437         synchronized (groupInfoKey.toString().intern()) {
438             s_logger.trace("syncsetupGroupKey groupKey {}", groupInfoKey);
439             groupMap.put(groupInfoKey, notifyTask);
440             if (isRemove) {
441                 MDSALUtil.syncDelete(m_dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
442             } else {
443                 MDSALUtil.syncWrite(m_dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
444             }
445             synchronized (notifyTask) {
446                 try {
447                     notifyTask.wait(delayTime);
448                 } catch (InterruptedException e){}
449             }
450         }
451     }
452
453     class GroupListener extends AbstractDataChangeListener<Group> {
454
455         public GroupListener() {
456             super(Group.class);
457         }
458
459         @Override
460         protected void remove(InstanceIdentifier<Group> identifier, Group del) {
461             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class, NodeKey.class).getId().getValue());
462             executeNotifyTaskIfRequired(dpId, del);
463         }
464
465         private void executeNotifyTaskIfRequired(BigInteger dpId, Group group) {
466             GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue());
467             Runnable notifyTask = groupMap.remove(groupKey);
468             if (notifyTask == null) {
469                 return;
470             }
471             executorService.execute(notifyTask);
472         }
473
474         @Override
475         protected void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
476             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class, NodeKey.class).getId().getValue());
477             executeNotifyTaskIfRequired(dpId, update);
478         }
479
480         @Override
481         protected void add(InstanceIdentifier<Group> identifier, Group add) {
482             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class, NodeKey.class).getId().getValue());
483             executeNotifyTaskIfRequired(dpId, add);
484         }
485     }
486     
487     class FlowListener extends AbstractDataChangeListener<Flow> {
488
489         public FlowListener() {
490             super(Flow.class);
491         }
492
493         @Override
494         protected void remove(InstanceIdentifier<Flow> identifier, Flow del) {
495             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class, NodeKey.class).getId().getValue());
496             notifyTaskIfRequired(dpId, del);
497         }
498
499         private void notifyTaskIfRequired(BigInteger dpId, Flow flow) {
500             FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId(), flow.getMatch(), flow.getId().getValue());
501             Runnable notifyTask = flowMap.remove(flowKey);
502             if (notifyTask == null) {
503                 return;
504             }
505             executorService.execute(notifyTask);
506         }
507
508         @Override
509         protected void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
510         }
511
512         @Override
513         protected void add(InstanceIdentifier<Flow> identifier, Flow add) {
514             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class, NodeKey.class).getId().getValue());
515             notifyTaskIfRequired(dpId, add);
516         }
517     }
518     
519     private BigInteger getDpnFromString(String dpnString) {
520         String[] split = dpnString.split(":");
521         return new BigInteger(split[1]);
522     }
523
524 }