CDS: introduce AbstractThreePhaseCommitCohort 18/17618/8
authorRobert Varga <rovarga@cisco.com>
Thu, 2 Apr 2015 10:52:08 +0000 (12:52 +0200)
committerRobert Varga <rovarga@cisco.com>
Tue, 7 Apr 2015 13:32:52 +0000 (15:32 +0200)
This is a rework of ChainedTransactionProxy/TransactionProxy
interactions. Rather than invoking a callback from TransactionProxy to
ChainedTransactionProxy, wrap the ready() call in
ChainedTransactionProxy and extract the required information from the
returned cohort object. Also make sure we preallocate the cohort future
collection.

Change-Id: I539ffb085800e4d62f1eab5bc8dae284e73c6aab
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..cac0f51
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import java.util.List;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import scala.concurrent.Future;
+
+/**
+ * Abstract base class for {@link DOMStoreThreePhaseCommitCohort} instances returned by this
+ * implementation. In addition to the usual set of methods it also contains the list of actor
+ * futures.
+ */
+abstract class AbstractThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+    abstract List<Future<ActorSelection>> getCohortFutures();
+}
index c59a277fa867860355f00bc1ba47439fb98035e7..ed3aa85c1fc5cd56b1a8f05f88bf871a995b9804 100644 (file)
@@ -44,10 +44,12 @@ final class ChainedTransactionProxy extends TransactionProxy {
     }
 
     @Override
-    protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
+    public AbstractThreePhaseCommitCohort ready() {
+        final AbstractThreePhaseCommitCohort ret = super.ready();
+        readyFutures = ret.getCohortFutures();
         LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
-                readyFutures.size(), getTransactionChainId());
-        this.readyFutures = readyFutures;
+            readyFutures.size(), getTransactionChainId());
+        return ret;
     }
 
     /**
index 766cf1d578c5eab63fb8f8a59a3ebdbd3c4a0380..376b6580464cd08f8942d2b800d5fccb69b333f0 100644 (file)
@@ -7,14 +7,18 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorSelection;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import java.util.Collections;
+import java.util.List;
+import scala.concurrent.Future;
 
 /**
- * A {@link DOMStoreThreePhaseCommitCohort} instance given out for empty transactions.
+ * A {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}
+ * instance given out for empty transactions.
  */
-final class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort {
     static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
 
     private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null);
@@ -43,4 +47,9 @@ final class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseComm
     public ListenableFuture<Void> commit() {
         return IMMEDIATE_VOID_SUCCESS;
     }
-}
\ No newline at end of file
+
+    @Override
+    List<Future<ActorSelection>> getCohortFutures() {
+        return Collections.emptyList();
+    }
+}
index 0b826071ecbd2ec93f5ecbfaaaffe9c81ffefbd2..3a2bcf2336713d2af695065792d62358de4ce9c2 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -25,7 +24,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -34,7 +32,7 @@ import scala.runtime.AbstractFunction1;
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
  */
-public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
+public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort {
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
@@ -322,7 +320,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         }, actorContext.getClientDispatcher());
     }
 
-    @VisibleForTesting
+    @Override
     List<Future<ActorSelection>> getCohortFutures() {
         return Collections.unmodifiableList(cohortFutures);
     }
index a2a7a12044a74e23bc181f65e450e14548059bec..0b863623b4b9877cfc1553456994d241008f35ab 100644 (file)
@@ -21,7 +21,6 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +41,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -396,7 +394,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     @Override
-    public DOMStoreThreePhaseCommitCohort ready() {
+    public AbstractThreePhaseCommitCohort ready() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Read-only transactions cannot be readied");
 
@@ -407,15 +405,13 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                     txFutureCallbackMap.size());
 
         if (txFutureCallbackMap.isEmpty()) {
-            onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
         }
 
         throttleOperation(txFutureCallbackMap.size());
 
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-
+        List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
             LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
@@ -440,20 +436,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             cohortFutures.add(future);
         }
 
-        onTransactionReady(cohortFutures);
-
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
             getIdentifier().toString());
     }
 
-    /**
-     * Method for derived classes to be notified when the transaction has been readied.
-     *
-     * @param cohortFutures the cohort Futures for each shard transaction.
-     */
-    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
-    }
-
     @Override
     public void close() {
         if (!seal(TransactionState.CLOSED)) {