2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.mdsalutil.internal;
11 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningSingleThreadExecutor;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
29 import org.opendaylight.genius.infra.Datastore;
30 import org.opendaylight.genius.infra.Datastore.Configuration;
31 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
32 import org.opendaylight.genius.infra.TypedReadTransaction;
33 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
34 import org.opendaylight.genius.infra.TypedWriteTransaction;
35 import org.opendaylight.genius.mdsalutil.FlowEntity;
36 import org.opendaylight.genius.mdsalutil.FlowInfoKey;
37 import org.opendaylight.genius.mdsalutil.GroupEntity;
38 import org.opendaylight.genius.mdsalutil.GroupInfoKey;
39 import org.opendaylight.genius.mdsalutil.MDSALUtil;
40 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
41 import org.opendaylight.infrautils.inject.AbstractLifecycle;
42 import org.opendaylight.infrautils.utils.concurrent.Executors;
43 import org.opendaylight.infrautils.utils.concurrent.NamedLocks;
44 import org.opendaylight.infrautils.utils.concurrent.NamedSimpleReentrantLock.Acquired;
45 import org.opendaylight.mdsal.binding.api.DataBroker;
46 import org.opendaylight.mdsal.binding.api.WriteTransaction;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
49 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.BucketId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.BucketKey;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
70 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
71 import org.opendaylight.yangtools.yang.common.Uint64;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
76 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
77 public class MDSALManager extends AbstractLifecycle implements IMdsalApiManager {
78 private static final class FlowLockKey {
79 private final Uint64 dpId;
80 private final FlowKey flowKey;
81 private final short tableId;
83 FlowLockKey(Uint64 dpId, short tableId, FlowKey flowKey) {
85 this.tableId = tableId;
86 this.flowKey = flowKey;
90 public int hashCode() {
91 return 31 * Short.hashCode(tableId) + Objects.hash(dpId, flowKey);
95 public boolean equals(Object obj) {
99 if (!(obj instanceof FlowLockKey)) {
102 final FlowLockKey other = (FlowLockKey) obj;
103 return tableId == other.tableId && Objects.equals(dpId, other.dpId)
104 && Objects.equals(flowKey, other.flowKey);
108 private static final class GroupLockKey {
109 private final Uint64 dpId;
110 private final long groupId;
112 GroupLockKey(long groupId, Uint64 dpId) {
113 this.groupId = groupId;
118 public int hashCode() {
119 return 31 * Long.hashCode(groupId) + Objects.hashCode(dpId);
123 public boolean equals(Object obj) {
127 if (!(obj instanceof GroupLockKey)) {
130 final GroupLockKey other = (GroupLockKey) obj;
131 return groupId == other.groupId && Objects.equals(dpId, other.dpId);
135 private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
136 private static final NamedLocks<FlowLockKey> FLOW_LOCKS = new NamedLocks<>();
137 private static final NamedLocks<GroupLockKey> GROUP_LOCKS = new NamedLocks<>();
139 private final DataBroker dataBroker;
140 private final RetryingManagedNewTransactionRunner txRunner;
141 private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
143 private final ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<>();
144 private final ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<>();
145 private final ExecutorService executorService = newListeningSingleThreadExecutor("genius-MDSALManager", LOG);
146 private final SingleTransactionDataBroker singleTxDb;
147 private final FlowListener flowListener = new FlowListener();
148 private final FlowConfigListener flowConfigListener = new FlowConfigListener();
149 private final GroupListener groupListener = new GroupListener();
152 * Writes the flows and Groups to the MD SAL DataStore which will be sent to
153 * the openflowplugin for installing flows/groups on the switch. Other
154 * modules of VPN service that wants to install flows / groups on the switch
158 * dataBroker reference
159 * @param pktProcService
160 * PacketProcessingService for sending the packet outs
163 public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
168 public MDSALManager(DataBroker db) {
169 this.dataBroker = db;
170 this.txRunner = new RetryingManagedNewTransactionRunner(db);
171 singleTxDb = new SingleTransactionDataBroker(dataBroker);
172 LOG.info("MDSAL Manager Initialized ");
176 protected void start() {
177 LOG.info("{} start", getClass().getSimpleName());
179 int batchSize = Integer.getInteger("batch.size", 1000);
180 int batchInterval = Integer.getInteger("batch.wait.time", 500);
182 flowBatchingUtils.registerWithBatchManager(new MdSalUtilBatchHandler(dataBroker, batchSize, batchInterval));
186 protected void stop() {
187 LOG.info("{} stop", getClass().getSimpleName());
189 flowListener.close();
190 flowConfigListener.close();
191 groupListener.close();
195 FluentFuture<Void> installFlowInternal(FlowEntity flowEntity) {
196 return addCallBackForInstallFlowAndReturn(txRunner
197 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
198 tx -> writeFlowEntityInternal(flowEntity, tx)));
201 private FluentFuture<Void> installFlowInternal(Uint64 dpId, Flow flow) {
202 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
203 tx -> writeFlowInternal(dpId, flow, tx));
206 private static void writeFlowEntityInternal(FlowEntity flowEntity,
207 TypedWriteTransaction<Datastore.Configuration> tx) {
208 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
209 FlowBuilder flowbld = flowEntity.getFlowBuilder();
210 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
211 flowEntity.getTableId(), flowKey);
212 tx.mergeParentStructurePut(flowInstanceId, flowbld.build());
215 private static void writeFlowInternal(Uint64 dpId, Flow flow,
216 TypedWriteTransaction<Datastore.Configuration> tx) {
217 FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
218 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
219 flow.getTableId().toJava(), flowKey);
220 tx.mergeParentStructurePut(flowInstanceId, flow);
224 FluentFuture<Void> installGroupInternal(GroupEntity groupEntity) {
225 return addCallBackForInstallGroupAndReturn(txRunner
226 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
227 tx -> writeGroupEntityInternal(groupEntity, tx)));
230 private static void writeGroupEntityInternal(GroupEntity groupEntity,
231 TypedWriteTransaction<Datastore.Configuration> tx) {
232 Group group = groupEntity.getGroupBuilder().build();
233 Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
234 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
235 tx.mergeParentStructurePut(groupInstanceId, group);
239 FluentFuture<Void> removeFlowInternal(FlowEntity flowEntity) {
240 return addCallBackForDeleteFlowAndReturn(txRunner
241 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
242 tx -> deleteFlowEntityInternal(flowEntity, tx)));
245 private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
246 Uint64 dpId = flowEntity.getDpnId();
247 short tableId = flowEntity.getTableId();
248 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
249 deleteFlow(dpId, tableId, flowKey, tx);
252 private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey, WriteTransaction tx) {
253 if (flowExists(dpId, tableId, flowKey)) {
254 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
255 tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
257 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
261 private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey,
262 TypedWriteTransaction<Datastore.Configuration> tx) {
263 if (flowExists(dpId, tableId, flowKey)) {
264 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
265 tx.delete(flowInstanceId);
267 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
271 private FluentFuture<Void> removeFlowNewInternal(Uint64 dpnId, Flow flowEntity) {
272 LOG.debug("Remove flow {}", flowEntity);
273 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
275 FlowKey flowKey = new FlowKey(flowEntity.getId());
276 short tableId = flowEntity.getTableId().toJava();
277 deleteFlow(dpnId, tableId, flowKey, tx);
282 FluentFuture<Void> removeGroupInternal(Uint64 dpnId, long groupId) {
283 return addCallBackForInstallGroupAndReturn(txRunner
284 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
285 tx -> removeGroupInternal(dpnId, groupId, tx)));
288 private void removeGroupInternal(Uint64 dpnId, long groupId,
289 TypedWriteTransaction<Datastore.Configuration> tx) {
290 Node nodeDpn = buildDpnNode(dpnId);
291 if (groupExists(nodeDpn, groupId)) {
292 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
293 tx.delete(groupInstanceId);
295 LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
299 private static Node buildDpnNode(Uint64 dpnId) {
300 NodeId nodeId = new NodeId("openflow:" + dpnId);
301 Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
306 private void syncSetUpFlowInternal(FlowEntity flowEntity, boolean isRemove) {
307 if (LOG.isTraceEnabled()) {
308 LOG.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
310 Flow flow = flowEntity.getFlowBuilder().build();
311 String flowId = flowEntity.getFlowId();
312 short tableId = flowEntity.getTableId();
313 Uint64 dpId = flowEntity.getDpnId();
314 FlowKey flowKey = new FlowKey(new FlowId(flowId));
315 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
318 try (Acquired lock = FLOW_LOCKS.acquire(new FlowLockKey(dpId, tableId, flowKey))) {
319 if (flowExists(dpId, tableId, flowKey)) {
320 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
322 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
326 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
330 private void syncSetUpGroupInternal(GroupEntity groupEntity, boolean isRemove) {
331 if (LOG.isTraceEnabled()) {
332 LOG.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
334 Group group = groupEntity.getGroupBuilder().build();
335 Uint64 dpId = groupEntity.getDpnId();
336 long groupId = groupEntity.getGroupId();
337 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
340 try (Acquired lock = GROUP_LOCKS.acquire(new GroupLockKey(groupId, dpId))) {
341 if (groupExists(dpId, groupId)) {
342 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
344 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
348 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
352 private class GroupListener extends AbstractClusteredAsyncDataTreeChangeListener<Group> {
355 super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
356 .augmentation(FlowCapableNode.class).child(Group.class),
357 Executors.newSingleThreadExecutor("GroupListener", LOG));
361 public void remove(InstanceIdentifier<Group> identifier, Group del) {
362 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
363 executeNotifyTaskIfRequired(dpId, del);
366 private void executeNotifyTaskIfRequired(Uint64 dpId, Group group) {
367 GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue().toJava());
368 Runnable notifyTask = groupMap.remove(groupKey);
369 if (notifyTask == null) {
372 executorService.execute(notifyTask);
376 public void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
377 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
378 executeNotifyTaskIfRequired(dpId, update);
382 public void add(InstanceIdentifier<Group> identifier, Group add) {
383 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
384 executeNotifyTaskIfRequired(dpId, add);
388 private class FlowListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
391 super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class).child(Node.class)
392 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
393 Executors.newSingleThreadExecutor("FlowListener", LOG));
397 public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
398 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
399 notifyTaskIfRequired(dpId, del);
402 private void notifyTaskIfRequired(Uint64 dpId, Flow flow) {
403 FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId().toJava(),
404 flow.getMatch(), flow.getId().getValue());
405 Runnable notifyTask = flowMap.remove(flowKey);
406 if (notifyTask == null) {
409 executorService.execute(notifyTask);
413 public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
417 public void add(InstanceIdentifier<Flow> identifier, Flow add) {
418 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
419 notifyTaskIfRequired(dpId, add);
424 private class FlowConfigListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
425 private final Logger flowLog = LoggerFactory.getLogger(FlowConfigListener.class);
427 FlowConfigListener() {
428 super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
429 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
430 Executors.newSingleThreadExecutor("FlowConfigListener", LOG));
434 public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
435 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
436 flowLog.trace("FlowId {} deleted from Table {} on DPN {}",
437 del.getId().getValue(), del.getTableId(), dpId);
441 public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
445 public void add(InstanceIdentifier<Flow> identifier, Flow add) {
446 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
447 flowLog.debug("FlowId {} added to Table {} on DPN {}",
448 add.getId().getValue(), add.getTableId(), dpId);
452 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
453 justification = "https://github.com/spotbugs/spotbugs/issues/811")
454 private static Uint64 getDpnFromString(String dpnString) {
455 String[] split = dpnString.split(":");
456 return Uint64.valueOf(split[1]);
460 public FluentFuture<Void> installFlow(FlowEntity flowEntity) {
461 return installFlowInternal(flowEntity);
465 public FluentFuture<Void> installFlow(Uint64 dpId, Flow flowEntity) {
466 return installFlowInternal(dpId, flowEntity);
470 public FluentFuture<Void> installFlow(Uint64 dpId, FlowEntity flowEntity) {
471 return installFlowInternal(dpId, flowEntity.getFlowBuilder().build());
475 public ListenableFuture<Void> removeFlow(Uint64 dpId, short tableId, FlowId flowId) {
476 ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
477 tx -> deleteFlow(dpId, tableId, new FlowKey(flowId), tx));
479 Futures.addCallback(future, new FutureCallback<Void>() {
481 public void onSuccess(final Void result) {
482 // Committed successfully
483 LOG.debug("Delete Flow -- Committed successfully");
487 public void onFailure(final Throwable throwable) {
488 // Transaction failed
489 if (throwable instanceof OptimisticLockFailedException) {
490 // Failed because of concurrent transaction modifying same
492 LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
494 // Some other type of TransactionCommitFailedException
495 LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
499 }, MoreExecutors.directExecutor());
505 public FluentFuture<Void> removeFlow(Uint64 dpId, Flow flowEntity) {
506 return removeFlowNewInternal(dpId, flowEntity);
510 public FluentFuture<Void> removeFlow(FlowEntity flowEntity) {
511 return removeFlowInternal(flowEntity);
515 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, FlowEntity flowEntity)
516 throws ExecutionException, InterruptedException {
517 removeFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowId(), flowEntity.getTableId());
521 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow)
522 throws ExecutionException, InterruptedException {
523 removeFlow(tx, dpId, flow.key(), flow.getTableId().toJava());
527 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, String flowId, short tableId)
528 throws ExecutionException, InterruptedException {
529 removeFlow(tx, dpId, new FlowKey(new FlowId(flowId)), tableId);
533 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, FlowKey flowKey,
534 short tableId) throws ExecutionException, InterruptedException {
535 InstanceIdentifier<Flow> flowInstanceIdentifier = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
536 if (tx.read(flowInstanceIdentifier).get().isPresent()) {
537 tx.delete(flowInstanceIdentifier);
542 public void removeGroup(GroupEntity groupEntity) {
543 removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId());
547 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, GroupEntity groupEntity)
548 throws ExecutionException, InterruptedException {
549 removeGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupId());
553 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Group group)
554 throws ExecutionException, InterruptedException {
555 removeGroup(tx, dpId, group.getGroupId().getValue().toJava());
559 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId)
560 throws ExecutionException, InterruptedException {
561 Node nodeDpn = buildDpnNode(dpId);
562 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
563 if (tx.read(groupInstanceId).get().isPresent()) {
564 tx.delete(groupInstanceId);
566 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
571 public void syncRemoveFlow(FlowEntity flowEntity, long delayTime) {
572 syncSetUpFlowInternal(flowEntity, true);
576 public void syncRemoveFlow(FlowEntity flowEntity) {
577 syncSetUpFlowInternal(flowEntity, true);
581 public void syncInstallFlow(FlowEntity flowEntity, long delayTime) {
582 syncSetUpFlowInternal(flowEntity, false);
586 public void syncInstallFlow(FlowEntity flowEntity) {
587 syncSetUpFlowInternal(flowEntity, false);
591 public void syncInstallGroup(GroupEntity groupEntity) {
592 syncSetUpGroupInternal(groupEntity, false);
596 public void syncRemoveGroup(GroupEntity groupEntity) {
597 syncSetUpGroupInternal(groupEntity, true);
601 public void addFlow(TypedWriteTransaction<Configuration> tx, FlowEntity flowEntity) {
602 addFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowBuilder().build());
606 public void addFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow) {
607 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
608 flow.getTableId().toJava(), flow.key());
609 tx.mergeParentStructurePut(flowInstanceId, flow);
613 public void addGroup(TypedWriteTransaction<Configuration> tx, GroupEntity groupEntity) {
614 addGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupBuilder().build());
618 public void addGroup(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Group group) {
619 Node nodeDpn = buildDpnNode(dpId);
620 long groupId = group.getGroupId().getValue().toJava();
621 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
622 tx.mergeParentStructurePut(groupInstanceId, group);
626 public void addBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, Bucket bucket)
627 throws ExecutionException, InterruptedException {
628 Node nodeDpn = buildDpnNode(dpId);
629 if (groupExists(tx, nodeDpn, groupId)) {
630 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
631 bucket.getBucketId().getValue().toJava(), nodeDpn);
632 tx.put(bucketInstanceId, bucket);
637 public void removeBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, long bucketId)
638 throws ExecutionException, InterruptedException {
639 Node nodeDpn = buildDpnNode(dpId);
640 if (groupExists(tx, nodeDpn, groupId)) {
641 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
642 tx.delete(bucketInstanceId);
644 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
649 public boolean groupExists(Uint64 dpId, long groupId) {
650 return groupExists(buildDpnNode(dpId), groupId);
653 private boolean groupExists(Node nodeDpn, long groupId) {
654 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
656 return singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION, groupInstanceId).isPresent();
657 } catch (ExecutionException | InterruptedException e) {
658 LOG.warn("Exception while reading group {} for Node {}", groupId, nodeDpn.key());
663 private static boolean groupExists(TypedReadTransaction<Configuration> tx, Node nodeDpn, long groupId)
664 throws ExecutionException, InterruptedException {
665 return tx.exists(buildGroupInstanceIdentifier(groupId, nodeDpn)).get();
668 private static InstanceIdentifier<Group> buildGroupInstanceIdentifier(long groupId, Node nodeDpn) {
669 InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
670 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
671 .child(Group.class, new GroupKey(new GroupId(groupId))).build();
672 return groupInstanceId;
675 private boolean flowExists(Uint64 dpId, short tableId, FlowKey flowKey) {
676 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
678 Optional<Flow> flowOptional = singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
680 return flowOptional.isPresent();
681 } catch (ExecutionException | InterruptedException e) {
682 LOG.warn("Exception while reading flow {} for dpn {}", flowKey, dpId);
687 private static InstanceIdentifier<Flow> buildFlowInstanceIdentifier(Uint64 dpnId, short tableId,
689 InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
690 .child(Node.class, buildDpnNode(dpnId).key()).augmentation(FlowCapableNode.class)
691 .child(Table.class, new TableKey(tableId)).child(Flow.class, flowKey).build();
692 return flowInstanceId;
695 private static InstanceIdentifier<Bucket> buildBucketInstanceIdentifier(long groupId, long bucketId,
697 InstanceIdentifier<Bucket> bucketInstanceId = InstanceIdentifier.builder(Nodes.class)
698 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
699 .child(Group.class, new GroupKey(new GroupId(groupId)))
700 .child(Buckets.class)
701 .child(Bucket.class, new BucketKey(new BucketId(bucketId))).build();
702 return bucketInstanceId;
705 private static FluentFuture<Void> addCallBackForDeleteFlowAndReturn(FluentFuture<Void> fluentFuture) {
706 return callBack(fluentFuture, "Delete Flow");
709 private static FluentFuture<Void> addCallBackForInstallFlowAndReturn(FluentFuture<Void> fluentFuture) {
710 return callBack(fluentFuture, "Install Flow");
713 private static FluentFuture<Void> addCallBackForInstallGroupAndReturn(FluentFuture<Void> fluentFuture) {
714 return callBack(fluentFuture, "Install Group");
717 // Generic for handling callbacks
718 private static FluentFuture<Void> callBack(FluentFuture<Void> fluentFuture, String log) {
719 fluentFuture.addCallback(new FutureCallback<Void>() {
721 public void onSuccess(final Void result) {
722 // Committed successfully
723 LOG.debug("{} -- Committedsuccessfully ", log);
727 public void onFailure(final Throwable throwable) {
728 // Transaction failed
730 if (throwable instanceof OptimisticLockFailedException) {
731 // Failed because of concurrent transaction modifying same
733 LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
735 // Some other type of TransactionCommitFailedException
736 LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
739 }, MoreExecutors.directExecutor());