package org.opendaylight.genius.mdsalutil.internal;
import static org.opendaylight.controller.md.sal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
+import static org.opendaylight.infrautils.utils.concurrent.FluentFutures2.toChecked;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.genius.infra.Datastore;
import org.opendaylight.genius.infra.Datastore.Configuration;
-import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
-import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
import org.opendaylight.genius.infra.TypedReadTransaction;
import org.opendaylight.genius.infra.TypedReadWriteTransaction;
import org.opendaylight.genius.infra.TypedWriteTransaction;
private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
private final DataBroker dataBroker;
- private final ManagedNewTransactionRunner txRunner;
+ private final RetryingManagedNewTransactionRunner txRunner;
private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
private final PacketProcessingService packetProcessingService;
@Inject
public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
this.dataBroker = db;
- this.txRunner = new ManagedNewTransactionRunnerImpl(db);
+ this.txRunner = new RetryingManagedNewTransactionRunner(db);
this.packetProcessingService = pktProcService;
singleTxDb = new SingleTransactionDataBroker(dataBroker);
LOG.info("MDSAL Manager Initialized ");
}
@VisibleForTesting
- CheckedFuture<Void, TransactionCommitFailedException> installFlowInternal(FlowEntity flowEntity) {
- WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
- LOG.trace("InstallFlow for flowEntity {} ", flowEntity);
-
- writeFlowEntityInternal(flowEntity, tx);
-
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
-
- Futures.addCallback(submitFuture, new FutureCallback<Void>() {
-
- @Override
- public void onSuccess(final Void result) {
- // Committed successfully
- LOG.debug("Install Flow -- Committedsuccessfully ");
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- // Transaction failed
-
- if (throwable instanceof OptimisticLockFailedException) {
- // Failed because of concurrent transaction modifying same
- // data
- LOG.error("Install Flow -- Failed because of concurrent transaction modifying same data");
- } else {
- // Some other type of TransactionCommitFailedException
- LOG.error("Install Flow -- Some other type of TransactionCommitFailedException", throwable);
- }
- }
- }, MoreExecutors.directExecutor());
-
- return submitFuture;
+ FluentFuture<Void> installFlowInternal(FlowEntity flowEntity) {
+ return addCallBackForInstallFlowAndReturn(txRunner
+ .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
+ tx -> writeFlowEntityInternal(flowEntity, tx)));
}
- private CheckedFuture<Void, TransactionCommitFailedException> installFlowInternal(BigInteger dpId, Flow flow) {
- WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
- writeFlowInternal(dpId, flow, tx);
- return tx.submit();
+ private FluentFuture<Void> installFlowInternal(BigInteger dpId, Flow flow) {
+ return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
+ tx -> writeFlowInternal(dpId, flow, tx));
}
private void writeFlowEntityInternal(FlowEntity flowEntity, WriteTransaction tx) {
tx.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flowbld.build(), true);
}
+ private void writeFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
+ FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
+ FlowBuilder flowbld = flowEntity.getFlowBuilder();
+ InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
+ flowEntity.getTableId(), flowKey);
+ tx.put(flowInstanceId, flowbld.build(), true);
+ }
+
private void writeFlowInternal(BigInteger dpId, Flow flow, WriteTransaction tx) {
FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flowKey);
tx.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow, true);
}
+ private void writeFlowInternal(BigInteger dpId, Flow flow, TypedWriteTransaction<Datastore.Configuration> tx) {
+ FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
+ InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flowKey);
+ tx.put(flowInstanceId, flow, true);
+ }
+
private void batchedAddFlowInternal(BigInteger dpId, Flow flow) {
FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flowKey);
}
@VisibleForTesting
- CheckedFuture<Void, TransactionCommitFailedException> installGroupInternal(GroupEntity groupEntity) {
- WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
- writeGroupEntityInternal(groupEntity, tx);
-
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
-
- Futures.addCallback(submitFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- // Committed successfully
- LOG.debug("Install Group -- Committedsuccessfully ");
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- // Transaction failed
-
- if (throwable instanceof OptimisticLockFailedException) {
- // Failed because of concurrent transaction modifying same
- // data
- LOG.error("Install Group -- Failed because of concurrent transaction modifying same data");
- } else {
- // Some other type of TransactionCommitFailedException
- LOG.error("Install Group -- Some other type of TransactionCommitFailedException", throwable);
- }
- }
- }, MoreExecutors.directExecutor());
-
- return submitFuture;
+ FluentFuture<Void> installGroupInternal(GroupEntity groupEntity) {
+ return addCallBackForInstallGroupAndReturn(txRunner
+ .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
+ tx -> writeGroupEntityInternal(groupEntity, tx)));
}
private void writeGroupEntityInternal(GroupEntity groupEntity, WriteTransaction tx) {
tx.put(LogicalDatastoreType.CONFIGURATION, groupInstanceId, group, true);
}
+ private void writeGroupEntityInternal(GroupEntity groupEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
+ Group group = groupEntity.getGroupBuilder().build();
+ Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
+ InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
+ tx.put(groupInstanceId, group, true);
+ }
+
private void writeGroupInternal(BigInteger dpId, Group group, WriteTransaction tx) {
Node nodeDpn = buildDpnNode(dpId);
long groupId = group.getGroupId().getValue();
}
@VisibleForTesting
- CheckedFuture<Void, TransactionCommitFailedException> removeFlowInternal(FlowEntity flowEntity) {
- WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
- deleteFlowEntityInternal(flowEntity, tx);
-
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
-
- Futures.addCallback(submitFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- // Committed successfully
- LOG.debug("Delete Flow -- Committedsuccessfully ");
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- // Transaction failed
- if (throwable instanceof OptimisticLockFailedException) {
- // Failed because of concurrent transaction modifying same
- // data
- LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
- } else {
- // Some other type of TransactionCommitFailedException
- LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
- }
- }
-
- });
-
- return submitFuture;
+ FluentFuture<Void> removeFlowInternal(FlowEntity flowEntity) {
+ return addCallBackForDeleteFlowAndReturnm(txRunner
+ .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
+ tx -> deleteFlowEntityInternal(flowEntity, tx)));
}
private void deleteFlowEntityInternal(FlowEntity flowEntity, WriteTransaction tx) {
deleteFlow(dpId, tableId, flowKey, tx);
}
+ private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
+ BigInteger dpId = flowEntity.getDpnId();
+ short tableId = flowEntity.getTableId();
+ FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
+ deleteFlow(dpId, tableId, flowKey, tx);
+ }
+
private void deleteFlow(BigInteger dpId, short tableId, FlowKey flowKey, WriteTransaction tx) {
if (flowExists(dpId, tableId, flowKey)) {
InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
}
}
- private CheckedFuture<Void, TransactionCommitFailedException> removeFlowNewInternal(BigInteger dpnId,
- Flow flowEntity) {
+ private void deleteFlow(BigInteger dpId, short tableId, FlowKey flowKey,
+ TypedWriteTransaction<Datastore.Configuration> tx) {
+ if (flowExists(dpId, tableId, flowKey)) {
+ InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
+ tx.delete(flowInstanceId);
+ } else {
+ LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
+ }
+ }
+
+ private FluentFuture<Void> removeFlowNewInternal(BigInteger dpnId, Flow flowEntity) {
LOG.debug("Remove flow {}", flowEntity);
- WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
- deleteFlowInternal(dpnId, flowEntity, tx);
- return tx.submit();
+ return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
+ tx -> {
+ FlowKey flowKey = new FlowKey(flowEntity.getId());
+ short tableId = flowEntity.getTableId();
+ deleteFlow(dpnId, tableId, flowKey, tx);
+ });
}
private void deleteFlowInternal(BigInteger dpId, Flow flow, WriteTransaction tx) {
}
@VisibleForTesting
- CheckedFuture<Void, TransactionCommitFailedException> removeGroupInternal(BigInteger dpnId,
- long groupId) {
- WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
- removeGroupInternal(dpnId, groupId, tx);
-
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
-
- Futures.addCallback(submitFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- LOG.debug("Install Group -- Committedsuccessfully ");
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- if (throwable instanceof OptimisticLockFailedException) {
- LOG.error("Install Group -- Failed because of concurrent transaction modifying same data");
- } else {
- LOG.error("Install Group -- Some other type of TransactionCommitFailedException", throwable);
- }
- }
- });
-
- return submitFuture;
+ FluentFuture<Void> removeGroupInternal(BigInteger dpnId, long groupId) {
+ return addCallBackForInstallGroupAndReturn(txRunner
+ .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
+ tx -> removeGroupInternal(dpnId, groupId, tx)));
}
private void removeGroupInternal(BigInteger dpnId, long groupId, WriteTransaction tx) {
}
}
+ private void removeGroupInternal(BigInteger dpnId, long groupId,
+ TypedWriteTransaction<Datastore.Configuration> tx) {
+ Node nodeDpn = buildDpnNode(dpnId);
+ if (groupExists(nodeDpn, groupId)) {
+ InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
+ tx.delete(groupInstanceId);
+ } else {
+ LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
+ }
+ }
+
+
private void modifyGroupInternal(GroupEntity groupEntity) {
installGroup(groupEntity);
@Override
public CheckedFuture<Void, TransactionCommitFailedException> installFlow(FlowEntity flowEntity) {
- return installFlowInternal(flowEntity);
+ return toChecked(installFlowInternal(flowEntity),
+ t -> new TransactionCommitFailedException("installFlow failed", t));
}
@Override
public CheckedFuture<Void, TransactionCommitFailedException> installFlow(BigInteger dpId, Flow flowEntity) {
- return installFlowInternal(dpId, flowEntity);
+ return toChecked(installFlowInternal(dpId, flowEntity),
+ t -> new TransactionCommitFailedException("installFlow failed", t));
}
@Override
public CheckedFuture<Void, TransactionCommitFailedException> installFlow(BigInteger dpId, FlowEntity flowEntity) {
- return installFlowInternal(dpId, flowEntity.getFlowBuilder().build());
+ return toChecked(installFlowInternal(dpId, flowEntity.getFlowBuilder().build()),
+ t -> new TransactionCommitFailedException("installFlow failed", t));
}
@Override
@Override
public CheckedFuture<Void, TransactionCommitFailedException> removeFlow(BigInteger dpId, Flow flowEntity) {
- return removeFlowNewInternal(dpId, flowEntity);
+ return toChecked(removeFlowNewInternal(dpId, flowEntity),
+ t -> new TransactionCommitFailedException("removeFlow failed", t));
}
@Override
public CheckedFuture<Void, TransactionCommitFailedException> removeFlow(BigInteger dpId, FlowEntity flowEntity) {
- return removeFlowNewInternal(dpId, flowEntity.getFlowBuilder().build());
+ return toChecked(removeFlowNewInternal(dpId, flowEntity.getFlowBuilder().build()),
+ t -> new TransactionCommitFailedException("removeFlow failed", t));
}
@Override
public CheckedFuture<Void, TransactionCommitFailedException> removeFlow(FlowEntity flowEntity) {
- return removeFlowInternal(flowEntity);
+ return toChecked(removeFlowInternal(flowEntity),
+ t -> new TransactionCommitFailedException("removeFlow failed", t));
}
@Override
return bucketInstanceId;
}
+ private FluentFuture<Void> addCallBackForDeleteFlowAndReturnm(FluentFuture<Void> fluentFuture) {
+ return callBack(fluentFuture, "Delete Flow");
+ }
+
+ private FluentFuture<Void> addCallBackForInstallFlowAndReturn(FluentFuture<Void> fluentFuture) {
+ return callBack(fluentFuture, "Install Flow");
+ }
+
+ private FluentFuture<Void> addCallBackForInstallGroupAndReturn(FluentFuture<Void> fluentFuture) {
+ return callBack(fluentFuture, "Install Group");
+ }
+
+ // Generic for handling callbacks
+ private FluentFuture<Void> callBack(FluentFuture<Void> fluentFuture, String log) {
+ fluentFuture.addCallback(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ // Committed successfully
+ LOG.debug("{} -- Committedsuccessfully ", log);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ // Transaction failed
+
+ if (throwable instanceof OptimisticLockFailedException) {
+ // Failed because of concurrent transaction modifying same
+ // data
+ LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
+ } else {
+ // Some other type of TransactionCommitFailedException
+ LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
+ }
+ }
+ }, MoreExecutors.directExecutor());
+ return fluentFuture;
+ }
}