Split out ChainedTransactionProxy 30/17530/10
authorRobert Varga <rovarga@cisco.com>
Wed, 1 Apr 2015 10:47:27 +0000 (12:47 +0200)
committerRobert Varga <rovarga@cisco.com>
Thu, 2 Apr 2015 08:24:09 +0000 (10:24 +0200)
Rather than having it as a inner class, move it into its own file,
increasing clarity.

Change-Id: I83d667db353cc01a950ac24da7ee68fc66ceb46d
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java
new file mode 100644 (file)
index 0000000..c59a277
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import akka.dispatch.OnComplete;
+import java.util.List;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+final class ChainedTransactionProxy extends TransactionProxy {
+    private static final Logger LOG = LoggerFactory.getLogger(ChainedTransactionProxy.class);
+
+    /**
+     * Stores the ready Futures from the previous Tx in the chain.
+     */
+    private final List<Future<ActorSelection>> previousReadyFutures;
+
+    /**
+     * Stores the ready Futures from this transaction when it is readied.
+     */
+    private volatile List<Future<ActorSelection>> readyFutures;
+
+    ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
+            String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
+        super(actorContext, transactionType, transactionChainId);
+        this.previousReadyFutures = previousReadyFutures;
+    }
+
+    List<Future<ActorSelection>> getReadyFutures() {
+        return readyFutures;
+    }
+
+    boolean isReady() {
+        return readyFutures != null;
+    }
+
+    @Override
+    protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
+        LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
+                readyFutures.size(), getTransactionChainId());
+        this.readyFutures = readyFutures;
+    }
+
+    /**
+     * This method is overridden to ensure the previous Tx's ready operations complete
+     * before we initiate the next Tx in the chain to avoid creation failures if the
+     * previous Tx's ready operations haven't completed yet.
+     */
+    @Override
+    protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
+        // Check if there are any previous ready Futures, otherwise let the super class handle it.
+        if(previousReadyFutures.isEmpty()) {
+            return super.sendFindPrimaryShardAsync(shardName);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
+                    previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
+        }
+
+        // Combine the ready Futures into 1.
+        Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
+                previousReadyFutures, getActorContext().getClientDispatcher());
+
+        // Add a callback for completion of the combined Futures.
+        final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
+        OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
+            @Override
+            public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
+                if(failure != null) {
+                    // A Ready Future failed so fail the returned Promise.
+                    returnPromise.failure(failure);
+                } else {
+                    LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
+                            getIdentifier(), getTransactionChainId());
+
+                    // Send the FindPrimaryShard message and use the resulting Future to complete the
+                    // returned Promise.
+                    returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
+                }
+            }
+        };
+
+        combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
+
+        return returnPromise.future();
+    }
+}
\ No newline at end of file
index 58ac1d8b8265bc50fb7d38dea1dd9c1b916211fc..11066edd543413de08591102ba2541d7baec9a0f 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
@@ -21,18 +20,13 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
-import scala.concurrent.Promise;
 
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
  */
 public class TransactionChainProxy implements DOMStoreTransactionChain {
 
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
-
     private interface State {
         boolean isReady();
 
@@ -139,83 +133,4 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
     private void checkReadyState(State state) {
         Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet");
     }
-
-    private static class ChainedTransactionProxy extends TransactionProxy {
-
-        /**
-         * Stores the ready Futures from the previous Tx in the chain.
-         */
-        private final List<Future<ActorSelection>> previousReadyFutures;
-
-        /**
-         * Stores the ready Futures from this transaction when it is readied.
-         */
-        private volatile List<Future<ActorSelection>> readyFutures;
-
-        private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
-                String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
-            super(actorContext, transactionType, transactionChainId);
-            this.previousReadyFutures = previousReadyFutures;
-        }
-
-        List<Future<ActorSelection>> getReadyFutures() {
-            return readyFutures;
-        }
-
-        boolean isReady() {
-            return readyFutures != null;
-        }
-
-        @Override
-        protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
-            LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
-                    readyFutures.size(), getTransactionChainId());
-            this.readyFutures = readyFutures;
-        }
-
-        /**
-         * This method is overridden to ensure the previous Tx's ready operations complete
-         * before we initiate the next Tx in the chain to avoid creation failures if the
-         * previous Tx's ready operations haven't completed yet.
-         */
-        @Override
-        protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
-            // Check if there are any previous ready Futures, otherwise let the super class handle it.
-            if(previousReadyFutures.isEmpty()) {
-                return super.sendFindPrimaryShardAsync(shardName);
-            }
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
-                        previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
-            }
-
-            // Combine the ready Futures into 1.
-            Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
-                    previousReadyFutures, getActorContext().getClientDispatcher());
-
-            // Add a callback for completion of the combined Futures.
-            final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
-            OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
-                @Override
-                public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
-                    if(failure != null) {
-                        // A Ready Future failed so fail the returned Promise.
-                        returnPromise.failure(failure);
-                    } else {
-                        LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
-                                getIdentifier(), getTransactionChainId());
-
-                        // Send the FindPrimaryShard message and use the resulting Future to complete the
-                        // returned Promise.
-                        returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
-                    }
-                }
-            };
-
-            combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
-
-            return returnPromise.future();
-        }
-    }
 }