Bug 499: Improved data change listener tree management
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMDataBrokerImpl.java
index 313a2c3d9cb612941015f57edbaa995431c1d6d6..fc87a9110576054eb75571167d01f60c1c95c98e 100644 (file)
@@ -10,10 +10,12 @@ package org.opendaylight.controller.md.sal.dom.broker.impl;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
@@ -23,6 +25,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -30,6 +33,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -45,12 +49,13 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
-public class DOMDataBrokerImpl implements DOMDataBroker {
+public class DOMDataBrokerImpl implements DOMDataBroker, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerImpl.class);
     private static final Logger COORDINATOR_LOG = LoggerFactory.getLogger(CommitCoordination.class);
     private final ImmutableMap<LogicalDatastoreType, DOMStore> datastores;
     private final ListeningExecutorService executor;
+    private final AtomicLong txNum = new AtomicLong();
 
     public DOMDataBrokerImpl(final ImmutableMap<LogicalDatastoreType, DOMStore> datastores,
             final ListeningExecutorService executor) {
@@ -83,7 +88,7 @@ public class DOMDataBrokerImpl implements DOMDataBroker {
     }
 
     private Object newTransactionIdentifier() {
-        return new Object();
+        return "DOM-" + txNum.getAndIncrement();
     }
 
     @Override
@@ -115,6 +120,7 @@ public class DOMDataBrokerImpl implements DOMDataBroker {
 
     private ListenableFuture<RpcResult<TransactionStatus>> submit(
             final WriteTransactionImpl<? extends DOMStoreWriteTransaction> transaction) {
+        LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
         return executor.submit(new CommitCoordination(transaction));
     }
 
@@ -245,6 +251,12 @@ public class DOMDataBrokerImpl implements DOMDataBroker {
                 final InstanceIdentifier path) {
             return getSubtransaction(store).read(path);
         }
+
+        @Override
+        public void merge(final LogicalDatastoreType store, final InstanceIdentifier path,
+                final NormalizedNode<?, ?> data) {
+
+        }
     }
 
     private final class CommitCoordination implements Callable<RpcResult<TransactionStatus>> {
@@ -258,24 +270,40 @@ public class DOMDataBrokerImpl implements DOMDataBroker {
         @Override
         public RpcResult<TransactionStatus> call() throws Exception {
 
-            Boolean canCommit = canCommit().get();
+            try {
+                Boolean canCommit = canCommit().get();
 
-            if (canCommit) {
-                try {
-                    preCommit().get();
+                if (canCommit) {
                     try {
-                        commit().get();
+                        preCommit().get();
+                        try {
+                            commit().get();
+                            COORDINATOR_LOG.debug("Tx: {} Is commited.", transaction.getIdentifier());
+                            return Rpcs.getRpcResult(true, TransactionStatus.COMMITED,
+                                    Collections.<RpcError> emptySet());
+
+                        } catch (InterruptedException | ExecutionException e) {
+                            COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e);
+                        }
+
                     } catch (InterruptedException | ExecutionException e) {
-                        // ERROR
+                        COORDINATOR_LOG.warn("Tx: {} Error during preCommit, starting Abort",
+                                transaction.getIdentifier(), e);
                     }
-
-                } catch (InterruptedException | ExecutionException e) {
+                } else {
+                    COORDINATOR_LOG.info("Tx: {} Did not pass canCommit phase.", transaction.getIdentifier());
                     abort().get();
                 }
-            } else {
+            } catch (InterruptedException | ExecutionException e) {
+                COORDINATOR_LOG.warn("Tx: {} Error during canCommit, starting Abort", transaction.getIdentifier(), e);
+
+            }
+            try {
                 abort().get();
+            } catch (InterruptedException | ExecutionException e) {
+                COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e);
             }
-            return null;
+            return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.<RpcError> emptySet());
         }
 
         public ListenableFuture<Void> preCommit() {
@@ -317,4 +345,9 @@ public class DOMDataBrokerImpl implements DOMDataBroker {
 
     }
 
+    @Override
+    public void close() throws Exception {
+
+    }
+
 }