BUG-1845: implement proper shutdown sequence 31/11331/1
authorRobert Varga <rovarga@cisco.com>
Thu, 18 Sep 2014 17:35:52 +0000 (19:35 +0200)
committerRobert Varga <rovarga@cisco.com>
Thu, 18 Sep 2014 17:37:36 +0000 (19:37 +0200)
The notification chain should only invoke the successful callback once
all transactions have been committed successfully. This patch does
precisely that by tracking the number of outstanding transactions. Since
this requires notification about successful transactions, too, we get
rid of the unneeded DOMDataCommitErrorInvoker/Listener abstraction.

Change-Id: I38534a3fb79a8a461059504de7b0cdd48348afef
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitExecutor.java

index 8ed52061328bab92eb9074fabc9df010f7f0a9d7..5fbf1270cc7110133637cdb70016f44356a388a3 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
 import static com.google.common.base.Preconditions.checkState;
-import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.EnumMap;
@@ -102,6 +101,6 @@ public class DOMDataBrokerImpl extends AbstractDOMForwardedTransactionFactory<DO
     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
             final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
         LOG.debug("Transaction: {} submitted with cohorts {}.", transaction.getIdentifier(), cohorts);
-        return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> absent());
+        return coordinator.submit(transaction, cohorts);
     }
 }
index 7cd6afa466e7d57b57f6861ac12aabf30bf90347..0b1dd1c5e0df481773a598ab3d99fa814e448187 100644 (file)
@@ -6,11 +6,14 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
@@ -27,16 +30,27 @@ import org.slf4j.LoggerFactory;
  * {@link org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType} type.
  *
  */
-public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
-        implements DOMTransactionChain, DOMDataCommitErrorListener {
+final class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory<DOMStoreTransactionChain>
+        implements DOMTransactionChain {
+    private static enum State {
+        RUNNING,
+        CLOSING,
+        CLOSED,
+        FAILED,
+    }
 
+    private static final AtomicIntegerFieldUpdater<DOMDataBrokerTransactionChainImpl> COUNTER_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, "counter");
+    private static final AtomicReferenceFieldUpdater<DOMDataBrokerTransactionChainImpl, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, State.class, "state");
     private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class);
     private final AtomicLong txNum = new AtomicLong();
     private final DOMDataCommitExecutor coordinator;
     private final TransactionChainListener listener;
     private final long chainId;
 
-    private volatile boolean failed = false;
+    private volatile State state = State.RUNNING;
+    private volatile int counter = 0;
 
     /**
      *
@@ -62,37 +76,70 @@ public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTrans
         this.listener = Preconditions.checkNotNull(listener);
     }
 
+    private void checkNotFailed() {
+        Preconditions.checkState(state != State.FAILED, "Transaction chain has failed");
+    }
+
     @Override
     protected Object newTransactionIdentifier() {
         return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
     }
 
     @Override
-    public CheckedFuture<Void,TransactionCommitFailedException> submit(
+    public CheckedFuture<Void, TransactionCommitFailedException> submit(
             final DOMDataWriteTransaction transaction, final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
+        checkNotFailed();
         checkNotClosed();
 
-        return coordinator.submit(transaction, cohorts, Optional.<DOMDataCommitErrorListener> of(this));
+        final CheckedFuture<Void, TransactionCommitFailedException> ret = coordinator.submit(transaction, cohorts);
+
+        COUNTER_UPDATER.incrementAndGet(this);
+        Futures.addCallback(ret, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                transactionCompleted();
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                transactionFailed(transaction, t);
+            }
+        });
+
+        return ret;
     }
 
     @Override
     public void close() {
-        super.close();
+        final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING);
+        if (!success) {
+            LOG.debug("Chain {} is no longer running", this);
+            return;
+        }
 
+        super.close();
         for (DOMStoreTransactionChain subChain : getTxFactories().values()) {
             subChain.close();
         }
 
-        if (!failed) {
-            LOG.debug("Transaction chain {} successfully finished.", this);
-            // FIXME: this event should be emitted once all operations complete
-            listener.onTransactionChainSuccessful(this);
+        if (counter == 0) {
+            finishClose();
         }
     }
 
-    @Override
-    public void onCommitFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
-        failed = true;
+    private void finishClose() {
+        state = State.CLOSED;
+        listener.onTransactionChainSuccessful(this);
+    }
+
+    private void transactionCompleted() {
+        if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) {
+            finishClose();
+        }
+    }
+
+    private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
+        state = State.FAILED;
         LOG.debug("Transaction chain {} failed.", this, cause);
         listener.onTransactionChainFailed(this, tx, cause);
     }
index 77cf105ed6a6e676819593dd19ccfcdc8580897d..15d7b1d966e1a59e028616bb85f5670f5805860b 100644 (file)
@@ -6,7 +6,6 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
@@ -63,16 +62,15 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
     @Override
     public CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
-            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts, final Optional<DOMDataCommitErrorListener> listener) {
+            final Iterable<DOMStoreThreePhaseCommitCohort> cohorts) {
         Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
-        Preconditions.checkArgument(listener != null, "Listener must not be null");
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
 
         ListenableFuture<Void> commitFuture = null;
         try {
             commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts,
-                    listener, commitStatsTracker));
+                    commitStatsTracker));
         } catch(RejectedExecutionException e) {
             LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
                       executor, e);
@@ -81,10 +79,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
                         "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
         }
 
-        if (listener.isPresent()) {
-            Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
-        }
-
         return MappingCheckedFuture.create(commitFuture,
                 TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
     }
@@ -141,7 +135,6 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
 
         public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
                 final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
-                final Optional<DOMDataCommitErrorListener> listener,
                 final DurationStatsTracker commitStatTracker) {
             this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
             this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorInvoker.java
deleted file mode 100644 (file)
index 5ce9241..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-
-/**
- *
- * Utility implemetation of {@link FutureCallback} which is responsible
- * for invoking {@link DOMDataCommitErrorListener} on TransactionCommit failed.
- *
- * When {@link #onFailure(Throwable)} is invoked, supplied {@link DOMDataCommitErrorListener}
- * callback is invoked with associated transaction and throwable is invoked on listener.
- *
- */
-class DOMDataCommitErrorInvoker implements FutureCallback<Void> {
-
-    private final DOMDataWriteTransaction tx;
-    private final DOMDataCommitErrorListener listener;
-
-
-    /**
-     *
-     * Construct new DOMDataCommitErrorInvoker.
-     *
-     * @param transaction Transaction which should be passed as argument to {@link DOMDataCommitErrorListener#onCommitFailed(DOMDataWriteTransaction, Throwable)}
-     * @param listener Listener which should be invoked on error.
-     */
-    public DOMDataCommitErrorInvoker(DOMDataWriteTransaction transaction, DOMDataCommitErrorListener listener) {
-        this.tx = Preconditions.checkNotNull(transaction, "Transaction must not be null");
-        this.listener = Preconditions.checkNotNull(listener, "Listener must not be null");
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-        listener.onCommitFailed(tx, t);
-    }
-
-    @Override
-    public void onSuccess(Void result) {
-        // NOOP
-    }
-}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitErrorListener.java
deleted file mode 100644 (file)
index 80bc669..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.broker.impl;
-
-import java.util.EventListener;
-
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-
-/**
- *
- * Listener on transaction failure which may be passed to
- * {@link DOMDataCommitExecutor}. This listener is notified during transaction
- * processing, before result is delivered to other client code outside MD-SAL.
- * This allows implementors to update their internal state before transaction
- * failure is visible to client code.
- *
- * This is internal API for MD-SAL implementations, for consumer facing error
- * listeners see {@link org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener}.
- *
- */
-interface DOMDataCommitErrorListener extends EventListener {
-
-    /**
-     *
-     * Callback which is invoked on transaction failure during three phase
-     * commit in {@link DOMDataCommitExecutor}.
-     *
-     *
-     * Implementation of this callback MUST NOT do any blocking calls or any
-     * calls to MD-SAL, since this callback is invoked synchronously on MD-SAL
-     * Broker coordination thread.
-     *
-     * @param tx
-     *            Transaction which failed
-     * @param cause
-     *            Failure reason
-     */
-    void onCommitFailed(DOMDataWriteTransaction tx, Throwable cause);
-
-}
index 234758ca75413e3381cf5b71fd67c2e2f41e815a..8aa97e72d1156fb1a8b19d1e3ca62ff715d11a95 100644 (file)
@@ -7,11 +7,10 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
+import com.google.common.util.concurrent.CheckedFuture;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
 
 /**
  * Executor of Three Phase Commit coordination for
@@ -35,15 +34,13 @@ interface DOMDataCommitExecutor {
      *            Transaction to be used as context for reporting
      * @param cohort
      *            DOM Store cohorts representing provided transaction, its
-     *            subtransactoins.
-     * @param listener
-     *            Error listener which should be notified if transaction failed.
+     *            subtransactions.
      * @return a CheckedFuture. if commit coordination on cohorts finished successfully,
      *         nothing is returned from the Future, On failure,
      *         the Future fails with a {@link TransactionCommitFailedException}.
      *
      */
     CheckedFuture<Void,TransactionCommitFailedException> submit(DOMDataWriteTransaction tx,
-            Iterable<DOMStoreThreePhaseCommitCohort> cohort, Optional<DOMDataCommitErrorListener> listener);
+            Iterable<DOMStoreThreePhaseCommitCohort> cohort);
 
 }