commandIterator = commands.iterator();
try {
while (commandIterator.hasNext()) {
- TransactionCommand command = commandIterator.next();
- synchronized (this) {
- final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
- transactionInFlight = transaction;
- recordPendingTransaction(command, transaction);
- command.execute(transaction);
- ListenableFuture<Void> ft = transaction.submit();
- command.setTransactionResultFuture(ft);
- Futures.addCallback(ft, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- forgetSuccessfulTransaction(transaction);
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- // NOOP - handled by failure of transaction chain
- }
- }, MoreExecutors.directExecutor());
- }
+ executeCommand(commandIterator.next());
}
transactionInFlight = null;
} catch (IllegalStateException e) {
}
}
+ private synchronized void executeCommand(final TransactionCommand command) {
+ final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+ transactionInFlight = transaction;
+ recordPendingTransaction(command, transaction);
+ command.execute(transaction);
+ ListenableFuture<Void> ft = transaction.submit();
+ command.setTransactionResultFuture(ft);
+ Futures.addCallback(ft, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ forgetSuccessfulTransaction(transaction);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ // NOOP - handled by failure of transaction chain
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
if (!failedTransactionQueue.offer(transaction)) {
LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
continue;
}
- ReadWriteTransaction transactionInFlight = null;
- try {
- for (TransactionCommand command: commands) {
- synchronized (this) {
- final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
- transactionInFlight = transaction;
- recordPendingTransaction(command, transaction);
- command.execute(transaction);
- Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- forgetSuccessfulTransaction(transaction);
- command.onSuccess();
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- command.onFailure(throwable);
- // NOOP - handled by failure of transaction chain
- }
- }, MoreExecutors.directExecutor());
- }
+ commands.forEach(this::executeCommand);
+ }
+ }
+
+ private synchronized void executeCommand(final TransactionCommand command) {
+ ReadWriteTransaction transactionInFlight = null;
+ try {
+ final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+ transactionInFlight = transaction;
+ recordPendingTransaction(command, transaction);
+ command.execute(transaction);
+ Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ forgetSuccessfulTransaction(transaction);
+ command.onSuccess();
}
- } catch (IllegalStateException e) {
- if (transactionInFlight != null) {
- // TODO: This method should distinguish exceptions on which the command should be
- // retried from exceptions on which the command should NOT be retried.
- // Then it should retry only the commands which should be retried, otherwise
- // this method will retry commands which will never be successful forever.
- offerFailedTransaction(transactionInFlight);
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ command.onFailure(throwable);
+ // NOOP - handled by failure of transaction chain
}
- LOG.warn("Failed to process an update notification from OVS.", e);
+ }, MoreExecutors.directExecutor());
+ } catch (IllegalStateException e) {
+ if (transactionInFlight != null) {
+ // TODO: This method should distinguish exceptions on which the command should be
+ // retried from exceptions on which the command should NOT be retried.
+ // Then it should retry only the commands which should be retried, otherwise
+ // this method will retry commands which will never be successful forever.
+ offerFailedTransaction(transactionInFlight);
}
+ LOG.warn("Failed to process an update notification from OVS.", e);
}
}