2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.genius.mdsalutil.internal;
10 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningSingleThreadExecutor;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ExecutorService;
25 import javax.inject.Inject;
26 import javax.inject.Singleton;
27 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
28 import org.opendaylight.genius.mdsalutil.FlowEntity;
29 import org.opendaylight.genius.mdsalutil.FlowInfoKey;
30 import org.opendaylight.genius.mdsalutil.GroupEntity;
31 import org.opendaylight.genius.mdsalutil.GroupInfoKey;
32 import org.opendaylight.genius.mdsalutil.MDSALUtil;
33 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
34 import org.opendaylight.infrautils.inject.AbstractLifecycle;
35 import org.opendaylight.infrautils.utils.concurrent.Executors;
36 import org.opendaylight.infrautils.utils.concurrent.NamedLocks;
37 import org.opendaylight.infrautils.utils.concurrent.NamedSimpleReentrantLock.Acquired;
38 import org.opendaylight.mdsal.binding.api.DataBroker;
39 import org.opendaylight.mdsal.binding.util.Datastore;
40 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
41 import org.opendaylight.mdsal.binding.util.RetryingManagedNewTransactionRunner;
42 import org.opendaylight.mdsal.binding.util.TypedReadTransaction;
43 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
44 import org.opendaylight.mdsal.binding.util.TypedWriteTransaction;
45 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
46 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
47 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.BucketId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.BucketKey;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.common.Uint64;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
74 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
75 public class MDSALManager extends AbstractLifecycle implements IMdsalApiManager {
76 private static final class FlowLockKey {
77 private final Uint64 dpId;
78 private final FlowKey flowKey;
79 private final short tableId;
81 FlowLockKey(Uint64 dpId, short tableId, FlowKey flowKey) {
83 this.tableId = tableId;
84 this.flowKey = flowKey;
88 public int hashCode() {
89 return 31 * Short.hashCode(tableId) + Objects.hash(dpId, flowKey);
93 public boolean equals(Object obj) {
97 if (!(obj instanceof FlowLockKey)) {
100 final FlowLockKey other = (FlowLockKey) obj;
101 return tableId == other.tableId && Objects.equals(dpId, other.dpId)
102 && Objects.equals(flowKey, other.flowKey);
106 private static final class GroupLockKey {
107 private final Uint64 dpId;
108 private final long groupId;
110 GroupLockKey(long groupId, Uint64 dpId) {
111 this.groupId = groupId;
116 public int hashCode() {
117 return 31 * Long.hashCode(groupId) + Objects.hashCode(dpId);
121 public boolean equals(Object obj) {
125 if (!(obj instanceof GroupLockKey)) {
128 final GroupLockKey other = (GroupLockKey) obj;
129 return groupId == other.groupId && Objects.equals(dpId, other.dpId);
133 private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
134 private static final NamedLocks<FlowLockKey> FLOW_LOCKS = new NamedLocks<>();
135 private static final NamedLocks<GroupLockKey> GROUP_LOCKS = new NamedLocks<>();
137 private final DataBroker dataBroker;
138 private final RetryingManagedNewTransactionRunner txRunner;
139 private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
141 private final ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<>();
142 private final ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<>();
143 private final ExecutorService executorService = newListeningSingleThreadExecutor("genius-MDSALManager", LOG);
144 private final SingleTransactionDataBroker singleTxDb;
145 private final FlowListener flowListener = new FlowListener();
146 private final FlowConfigListener flowConfigListener = new FlowConfigListener();
147 private final GroupListener groupListener = new GroupListener();
150 * Writes the flows and Groups to the MD SAL DataStore which will be sent to
151 * the openflowplugin for installing flows/groups on the switch. Other
152 * modules of VPN service that wants to install flows / groups on the switch
156 * dataBroker reference
157 * @param pktProcService
158 * PacketProcessingService for sending the packet outs
161 public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
166 public MDSALManager(DataBroker db) {
167 this.dataBroker = db;
168 this.txRunner = new RetryingManagedNewTransactionRunner(db);
169 singleTxDb = new SingleTransactionDataBroker(dataBroker);
170 LOG.info("MDSAL Manager Initialized ");
174 protected void start() {
175 LOG.info("{} start", getClass().getSimpleName());
177 int batchSize = Integer.getInteger("batch.size", 1000);
178 int batchInterval = Integer.getInteger("batch.wait.time", 500);
180 flowBatchingUtils.registerWithBatchManager(new MdSalUtilBatchHandler(dataBroker, batchSize, batchInterval));
184 protected void stop() {
185 LOG.info("{} stop", getClass().getSimpleName());
187 flowListener.close();
188 flowConfigListener.close();
189 groupListener.close();
193 FluentFuture<?> installFlowInternal(FlowEntity flowEntity) {
194 return addCallBackForInstallFlowAndReturn(txRunner
195 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
196 tx -> writeFlowEntityInternal(flowEntity, tx)));
199 private FluentFuture<?> installFlowInternal(Uint64 dpId, Flow flow) {
200 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
201 tx -> writeFlowInternal(dpId, flow, tx));
204 private static void writeFlowEntityInternal(FlowEntity flowEntity,
205 TypedWriteTransaction<Datastore.Configuration> tx) {
206 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
207 FlowBuilder flowbld = flowEntity.getFlowBuilder();
208 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
209 flowEntity.getTableId(), flowKey);
210 tx.mergeParentStructurePut(flowInstanceId, flowbld.build());
213 private static void writeFlowInternal(Uint64 dpId, Flow flow,
214 TypedWriteTransaction<Datastore.Configuration> tx) {
215 FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
216 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
217 flow.getTableId().toJava(), flowKey);
218 tx.mergeParentStructurePut(flowInstanceId, flow);
222 FluentFuture<?> installGroupInternal(GroupEntity groupEntity) {
223 return addCallBackForInstallGroupAndReturn(txRunner
224 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
225 tx -> writeGroupEntityInternal(groupEntity, tx)));
228 private static void writeGroupEntityInternal(GroupEntity groupEntity,
229 TypedWriteTransaction<Datastore.Configuration> tx) {
230 Group group = groupEntity.getGroupBuilder().build();
231 Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
232 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
233 tx.mergeParentStructurePut(groupInstanceId, group);
237 FluentFuture<?> removeFlowInternal(FlowEntity flowEntity) {
238 return addCallBackForDeleteFlowAndReturn(txRunner
239 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
240 tx -> deleteFlowEntityInternal(flowEntity, tx)));
243 private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
244 Uint64 dpId = flowEntity.getDpnId();
245 short tableId = flowEntity.getTableId();
246 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
247 deleteFlow(dpId, tableId, flowKey, tx);
250 private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey,
251 TypedWriteTransaction<Datastore.Configuration> tx) {
252 if (flowExists(dpId, tableId, flowKey)) {
253 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
254 tx.delete(flowInstanceId);
256 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
260 private FluentFuture<?> removeFlowNewInternal(Uint64 dpnId, Flow flowEntity) {
261 LOG.debug("Remove flow {}", flowEntity);
262 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
264 FlowKey flowKey = new FlowKey(flowEntity.getId());
265 short tableId = flowEntity.getTableId().toJava();
266 deleteFlow(dpnId, tableId, flowKey, tx);
271 FluentFuture<?> removeGroupInternal(Uint64 dpnId, long groupId) {
272 return addCallBackForInstallGroupAndReturn(txRunner
273 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
274 tx -> removeGroupInternal(dpnId, groupId, tx)));
277 private void removeGroupInternal(Uint64 dpnId, long groupId,
278 TypedWriteTransaction<Datastore.Configuration> tx) {
279 Node nodeDpn = buildDpnNode(dpnId);
280 if (groupExists(nodeDpn, groupId)) {
281 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
282 tx.delete(groupInstanceId);
284 LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
288 private static Node buildDpnNode(Uint64 dpnId) {
289 NodeId nodeId = new NodeId("openflow:" + dpnId);
290 Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
295 private void syncSetUpFlowInternal(FlowEntity flowEntity, boolean isRemove) {
296 if (LOG.isTraceEnabled()) {
297 LOG.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
299 Flow flow = flowEntity.getFlowBuilder().build();
300 String flowId = flowEntity.getFlowId();
301 short tableId = flowEntity.getTableId();
302 Uint64 dpId = flowEntity.getDpnId();
303 FlowKey flowKey = new FlowKey(new FlowId(flowId));
304 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
307 try (Acquired lock = FLOW_LOCKS.acquire(new FlowLockKey(dpId, tableId, flowKey))) {
308 if (flowExists(dpId, tableId, flowKey)) {
309 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
311 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
315 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
319 private void syncSetUpGroupInternal(GroupEntity groupEntity, boolean isRemove) {
320 if (LOG.isTraceEnabled()) {
321 LOG.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
323 Group group = groupEntity.getGroupBuilder().build();
324 Uint64 dpId = groupEntity.getDpnId();
325 long groupId = groupEntity.getGroupId();
326 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
329 try (Acquired lock = GROUP_LOCKS.acquire(new GroupLockKey(groupId, dpId))) {
330 if (groupExists(dpId, groupId)) {
331 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
333 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
337 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
341 private class GroupListener extends AbstractClusteredAsyncDataTreeChangeListener<Group> {
344 super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
345 .augmentation(FlowCapableNode.class).child(Group.class),
346 Executors.newSingleThreadExecutor("GroupListener", LOG));
350 public void remove(InstanceIdentifier<Group> identifier, Group del) {
351 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
352 executeNotifyTaskIfRequired(dpId, del);
355 private void executeNotifyTaskIfRequired(Uint64 dpId, Group group) {
356 GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue().toJava());
357 Runnable notifyTask = groupMap.remove(groupKey);
358 if (notifyTask == null) {
361 executorService.execute(notifyTask);
365 public void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
366 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
367 executeNotifyTaskIfRequired(dpId, update);
371 public void add(InstanceIdentifier<Group> identifier, Group add) {
372 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
373 executeNotifyTaskIfRequired(dpId, add);
377 private class FlowListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
380 super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class).child(Node.class)
381 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
382 Executors.newSingleThreadExecutor("FlowListener", LOG));
386 public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
387 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
388 notifyTaskIfRequired(dpId, del);
391 private void notifyTaskIfRequired(Uint64 dpId, Flow flow) {
392 FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId().toJava(),
393 flow.getMatch(), flow.getId().getValue());
394 Runnable notifyTask = flowMap.remove(flowKey);
395 if (notifyTask == null) {
398 executorService.execute(notifyTask);
402 public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
406 public void add(InstanceIdentifier<Flow> identifier, Flow add) {
407 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
408 notifyTaskIfRequired(dpId, add);
413 private class FlowConfigListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
414 private final Logger flowLog = LoggerFactory.getLogger(FlowConfigListener.class);
416 FlowConfigListener() {
417 super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
418 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
419 Executors.newSingleThreadExecutor("FlowConfigListener", LOG));
423 public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
424 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
425 flowLog.trace("FlowId {} deleted from Table {} on DPN {}",
426 del.getId().getValue(), del.getTableId(), dpId);
430 public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
434 public void add(InstanceIdentifier<Flow> identifier, Flow add) {
435 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
436 flowLog.debug("FlowId {} added to Table {} on DPN {}",
437 add.getId().getValue(), add.getTableId(), dpId);
441 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
442 justification = "https://github.com/spotbugs/spotbugs/issues/811")
443 private static Uint64 getDpnFromString(String dpnString) {
444 String[] split = dpnString.split(":");
445 return Uint64.valueOf(split[1]);
449 public FluentFuture<?> installFlow(FlowEntity flowEntity) {
450 return installFlowInternal(flowEntity);
454 public FluentFuture<?> installFlow(Uint64 dpId, Flow flowEntity) {
455 return installFlowInternal(dpId, flowEntity);
459 public FluentFuture<?> installFlow(Uint64 dpId, FlowEntity flowEntity) {
460 return installFlowInternal(dpId, flowEntity.getFlowBuilder().build());
464 public ListenableFuture<?> removeFlow(Uint64 dpId, short tableId, FlowId flowId) {
465 ListenableFuture<?> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
466 tx -> deleteFlow(dpId, tableId, new FlowKey(flowId), tx));
468 Futures.addCallback(future, new FutureCallback<Object>() {
470 public void onSuccess(final Object result) {
471 // Committed successfully
472 LOG.debug("Delete Flow -- Committed successfully");
476 public void onFailure(final Throwable throwable) {
477 // Transaction failed
478 if (throwable instanceof OptimisticLockFailedException) {
479 // Failed because of concurrent transaction modifying same
481 LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
483 // Some other type of TransactionCommitFailedException
484 LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
488 }, MoreExecutors.directExecutor());
494 public FluentFuture<?> removeFlow(Uint64 dpId, Flow flowEntity) {
495 return removeFlowNewInternal(dpId, flowEntity);
499 public FluentFuture<?> removeFlow(FlowEntity flowEntity) {
500 return removeFlowInternal(flowEntity);
504 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, FlowEntity flowEntity)
505 throws ExecutionException, InterruptedException {
506 removeFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowId(), flowEntity.getTableId());
510 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow)
511 throws ExecutionException, InterruptedException {
512 removeFlow(tx, dpId, flow.key(), flow.getTableId().toJava());
516 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, String flowId, short tableId)
517 throws ExecutionException, InterruptedException {
518 removeFlow(tx, dpId, new FlowKey(new FlowId(flowId)), tableId);
522 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, FlowKey flowKey,
523 short tableId) throws ExecutionException, InterruptedException {
524 InstanceIdentifier<Flow> flowInstanceIdentifier = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
525 if (tx.read(flowInstanceIdentifier).get().isPresent()) {
526 tx.delete(flowInstanceIdentifier);
531 public void removeGroup(GroupEntity groupEntity) {
532 removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId());
536 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, GroupEntity groupEntity)
537 throws ExecutionException, InterruptedException {
538 removeGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupId());
542 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Group group)
543 throws ExecutionException, InterruptedException {
544 removeGroup(tx, dpId, group.getGroupId().getValue().toJava());
548 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId)
549 throws ExecutionException, InterruptedException {
550 Node nodeDpn = buildDpnNode(dpId);
551 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
552 if (tx.read(groupInstanceId).get().isPresent()) {
553 tx.delete(groupInstanceId);
555 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
560 public void syncRemoveFlow(FlowEntity flowEntity, long delayTime) {
561 syncSetUpFlowInternal(flowEntity, true);
565 public void syncRemoveFlow(FlowEntity flowEntity) {
566 syncSetUpFlowInternal(flowEntity, true);
570 public void syncInstallFlow(FlowEntity flowEntity, long delayTime) {
571 syncSetUpFlowInternal(flowEntity, false);
575 public void syncInstallFlow(FlowEntity flowEntity) {
576 syncSetUpFlowInternal(flowEntity, false);
580 public void syncInstallGroup(GroupEntity groupEntity) {
581 syncSetUpGroupInternal(groupEntity, false);
585 public void syncRemoveGroup(GroupEntity groupEntity) {
586 syncSetUpGroupInternal(groupEntity, true);
590 public void addFlow(TypedWriteTransaction<Configuration> tx, FlowEntity flowEntity) {
591 addFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowBuilder().build());
595 public void addFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow) {
596 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
597 flow.getTableId().toJava(), flow.key());
598 tx.mergeParentStructurePut(flowInstanceId, flow);
602 public void addGroup(TypedWriteTransaction<Configuration> tx, GroupEntity groupEntity) {
603 addGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupBuilder().build());
607 public void addGroup(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Group group) {
608 Node nodeDpn = buildDpnNode(dpId);
609 long groupId = group.getGroupId().getValue().toJava();
610 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
611 tx.mergeParentStructurePut(groupInstanceId, group);
615 public void addBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, Bucket bucket)
616 throws ExecutionException, InterruptedException {
617 Node nodeDpn = buildDpnNode(dpId);
618 if (groupExists(tx, nodeDpn, groupId)) {
619 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
620 bucket.getBucketId().getValue().toJava(), nodeDpn);
621 tx.put(bucketInstanceId, bucket);
626 public void removeBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, long bucketId)
627 throws ExecutionException, InterruptedException {
628 Node nodeDpn = buildDpnNode(dpId);
629 if (groupExists(tx, nodeDpn, groupId)) {
630 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
631 tx.delete(bucketInstanceId);
633 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
638 public boolean groupExists(Uint64 dpId, long groupId) {
639 return groupExists(buildDpnNode(dpId), groupId);
642 private boolean groupExists(Node nodeDpn, long groupId) {
643 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
645 return singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION, groupInstanceId).isPresent();
646 } catch (ExecutionException | InterruptedException e) {
647 LOG.warn("Exception while reading group {} for Node {}", groupId, nodeDpn.key());
652 private static boolean groupExists(TypedReadTransaction<Configuration> tx, Node nodeDpn, long groupId)
653 throws ExecutionException, InterruptedException {
654 return tx.exists(buildGroupInstanceIdentifier(groupId, nodeDpn)).get();
657 private static InstanceIdentifier<Group> buildGroupInstanceIdentifier(long groupId, Node nodeDpn) {
658 InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
659 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
660 .child(Group.class, new GroupKey(new GroupId(groupId))).build();
661 return groupInstanceId;
664 private boolean flowExists(Uint64 dpId, short tableId, FlowKey flowKey) {
665 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
667 Optional<Flow> flowOptional = singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
669 return flowOptional.isPresent();
670 } catch (ExecutionException | InterruptedException e) {
671 LOG.warn("Exception while reading flow {} for dpn {}", flowKey, dpId);
676 private static InstanceIdentifier<Flow> buildFlowInstanceIdentifier(Uint64 dpnId, short tableId,
678 InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
679 .child(Node.class, buildDpnNode(dpnId).key()).augmentation(FlowCapableNode.class)
680 .child(Table.class, new TableKey(tableId)).child(Flow.class, flowKey).build();
681 return flowInstanceId;
684 private static InstanceIdentifier<Bucket> buildBucketInstanceIdentifier(long groupId, long bucketId,
686 InstanceIdentifier<Bucket> bucketInstanceId = InstanceIdentifier.builder(Nodes.class)
687 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
688 .child(Group.class, new GroupKey(new GroupId(groupId)))
689 .child(Buckets.class)
690 .child(Bucket.class, new BucketKey(new BucketId(bucketId))).build();
691 return bucketInstanceId;
694 private static FluentFuture<?> addCallBackForDeleteFlowAndReturn(FluentFuture<?> fluentFuture) {
695 return callBack(fluentFuture, "Delete Flow");
698 private static FluentFuture<?> addCallBackForInstallFlowAndReturn(FluentFuture<?> fluentFuture) {
699 return callBack(fluentFuture, "Install Flow");
702 private static FluentFuture<?> addCallBackForInstallGroupAndReturn(FluentFuture<?> fluentFuture) {
703 return callBack(fluentFuture, "Install Group");
706 // Generic for handling callbacks
707 private static FluentFuture<?> callBack(FluentFuture<?> fluentFuture, String log) {
708 fluentFuture.addCallback(new FutureCallback<Object>() {
710 public void onSuccess(final Object result) {
711 // Committed successfully
712 LOG.debug("{} -- Committedsuccessfully ", log);
716 public void onFailure(final Throwable throwable) {
717 // Transaction failed
719 if (throwable instanceof OptimisticLockFailedException) {
720 // Failed because of concurrent transaction modifying same
722 LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
724 // Some other type of TransactionCommitFailedException
725 LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
728 }, MoreExecutors.directExecutor());