*/
package org.opendaylight.openflowplugin.common.txchain;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
-import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
import org.opendaylight.mdsal.binding.api.Transaction;
private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
+ private final ReadWriteLock readWriteTransactionLock = new ReentrantReadWriteLock();
private final Object txLock = new Object();
private final DataBroker dataBroker;
private final String nodeId;
@GuardedBy("txLock")
private boolean submitIsEnabled;
@GuardedBy("txLock")
- private FluentFuture<? extends CommitInfo> lastSubmittedFuture;
-
- private volatile boolean initCommit;
-
+ private FluentFuture<? extends CommitInfo> lastSubmittedFuture = CommitInfo.emptyFluentFuture();
@GuardedBy("txLock")
private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
- private ReadWriteLock readWriteTransactionLock = new ReentrantReadWriteLock();
- public TransactionChainManager(@NonNull final DataBroker dataBroker,
- @NonNull final String deviceIdentifier) {
- this.dataBroker = dataBroker;
- this.nodeId = deviceIdentifier;
- this.lastSubmittedFuture = CommitInfo.emptyFluentFuture();
+ private volatile boolean initCommit;
+
+ public TransactionChainManager(final DataBroker dataBroker, final String nodeId) {
+ this.dataBroker = requireNonNull(dataBroker);
+ this.nodeId = requireNonNull(nodeId);
}
@Holding("txLock")
public void activateTransactionManager() {
if (LOG.isDebugEnabled()) {
LOG.debug("activateTransactionManager for node {} transaction submit is set to {}",
- this.nodeId, submitIsEnabled);
+ nodeId, submitIsEnabled);
}
synchronized (txLock) {
if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
"TxChainFactory survive last close.");
Preconditions.checkState(writeTx == null,
"We have some unexpected WriteTransaction.");
- this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
- this.submitIsEnabled = false;
- this.initCommit = true;
+ transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
+ submitIsEnabled = false;
+ initCommit = true;
createTxChain();
}
}
*/
public FluentFuture<?> deactivateTransactionManager() {
if (LOG.isDebugEnabled()) {
- LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
+ LOG.debug("deactivateTransactionManager for node {}", nodeId);
}
final FluentFuture<? extends CommitInfo> future;
synchronized (txLock) {
}
@Override
- public void onFailure(@NonNull final Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
closeTransactionChain();
}
}, MoreExecutors.directExecutor());
@GuardedBy("txLock")
@SuppressWarnings("checkstyle:IllegalCatch")
- public boolean submitTransaction(boolean doSync) {
+ public boolean submitTransaction(final boolean doSync) {
synchronized (txLock) {
if (!submitIsEnabled) {
LOG.trace("transaction not committed - submit block issued");
}
Preconditions.checkState(TransactionChainManagerStatus.WORKING == transactionChainManagerStatus,
"we have here Uncompleted Transaction for node {} and we are not MASTER",
- this.nodeId);
+ nodeId);
final FluentFuture<? extends CommitInfo> submitFuture = writeTx.commit();
lastSubmittedFuture = submitFuture;
writeTx = null;
public void onFailure(final Throwable throwable) {
if (throwable instanceof InterruptedException || throwable instanceof ExecutionException) {
LOG.error("Transaction commit failed. ", throwable);
+ } else if (throwable instanceof CancellationException) {
+ LOG.warn("Submit task was canceled");
+ LOG.trace("Submit exception: ", throwable);
} else {
- if (throwable instanceof CancellationException) {
- LOG.warn("Submit task was canceled");
- LOG.trace("Submit exception: ", throwable);
- } else {
- LOG.error("Exception during transaction submitting. ", throwable);
- }
+ LOG.error("Exception during transaction submitting. ", throwable);
}
}
}, MoreExecutors.directExecutor());
synchronized (txLock) {
ensureTransaction();
if (writeTx == null) {
- LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path);
+ LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeId, path);
throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
}
synchronized (txLock) {
ensureTransaction();
if (writeTx == null) {
- LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path);
+ LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeId, path);
throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
}
synchronized (txLock) {
ensureTransaction();
if (writeTx == null) {
- LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path);
+ LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", nodeId, path);
throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
}
synchronized (txLock) {
ensureTransaction();
if (writeTx == null) {
- LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path);
+ LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", nodeId, path);
throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
}
final Transaction transaction, final Throwable cause) {
synchronized (txLock) {
if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
- && chain.equals(this.transactionChain)) {
+ && chain.equals(transactionChain)) {
LOG.warn("Transaction chain failed, recreating chain due to ", cause);
closeTransactionChain();
createTxChain();
public FluentFuture<?> shuttingDown() {
if (LOG.isDebugEnabled()) {
- LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
+ LOG.debug("TxManager is going SHUTTING_DOWN for node {}", nodeId);
}
synchronized (txLock) {
- this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
+ transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
return txChainShuttingDown();
}
}
future = lastSubmittedFuture;
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Submitting all transactions for Node {}", this.nodeId);
+ LOG.debug("Submitting all transactions for Node {}", nodeId);
}
// hijack md-sal thread
future = writeTx.commit();
@Override
public void close() {
if (LOG.isDebugEnabled()) {
- LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
+ LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", nodeId);
}
synchronized (txLock) {
closeTransactionChain();