BUG-5626: split out CohortEntry 87/38587/3
authorRobert Varga <rovarga@cisco.com>
Mon, 9 May 2016 15:17:18 +0000 (17:17 +0200)
committerRobert Varga <rovarga@cisco.com>
Wed, 11 May 2016 09:06:22 +0000 (11:06 +0200)
This is a large static class, move it to its own file.

Change-Id: I79c161ec903369b969e649ddb37f96e62a8da829
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java
new file mode 100644 (file)
index 0000000..0fa6788
--- /dev/null
@@ -0,0 +1,206 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
+
+final class CohortEntry {
+    enum State {
+        PENDING,
+        CAN_COMMITTED,
+        PRE_COMMITTED,
+        COMMITTED,
+        ABORTED
+    }
+
+    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+
+    private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
+    private final ReadWriteShardDataTreeTransaction transaction;
+    private final String transactionID;
+    private final CompositeDataTreeCohort userCohorts;
+    private final short clientVersion;
+
+    private State state = State.PENDING;
+    private RuntimeException lastBatchedModificationsException;
+    private int totalBatchedModificationsReceived;
+    private ShardDataTreeCohort cohort;
+    private boolean doImmediateCommit;
+    private ActorRef replySender;
+    private Shard shard;
+
+    CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
+            DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
+        this.transaction = Preconditions.checkNotNull(transaction);
+        this.transactionID = transactionID;
+        this.clientVersion = clientVersion;
+        this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
+    }
+
+    CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
+            SchemaContext schema, short clientVersion) {
+        this.transactionID = transactionID;
+        this.cohort = cohort;
+        this.transaction = null;
+        this.clientVersion = clientVersion;
+        this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
+    }
+
+    void updateLastAccessTime() {
+        lastAccessTimer.reset();
+        lastAccessTimer.start();
+    }
+
+    String getTransactionID() {
+        return transactionID;
+    }
+
+    short getClientVersion() {
+        return clientVersion;
+    }
+
+    State getState() {
+        return state;
+    }
+
+    DataTreeCandidate getCandidate() {
+        return cohort.getCandidate();
+    }
+
+    DataTreeModification getDataTreeModification() {
+        return cohort.getDataTreeModification();
+    }
+
+    ReadWriteShardDataTreeTransaction getTransaction() {
+        return transaction;
+    }
+
+    int getTotalBatchedModificationsReceived() {
+        return totalBatchedModificationsReceived;
+    }
+
+    RuntimeException getLastBatchedModificationsException() {
+        return lastBatchedModificationsException;
+    }
+
+    void applyModifications(Iterable<Modification> modifications) {
+        totalBatchedModificationsReceived++;
+        if(lastBatchedModificationsException == null) {
+            for (Modification modification : modifications) {
+                    try {
+                        modification.apply(transaction.getSnapshot());
+                    } catch (RuntimeException e) {
+                        lastBatchedModificationsException = e;
+                        throw e;
+                    }
+            }
+        }
+    }
+
+    boolean canCommit() throws InterruptedException, ExecutionException {
+        state = State.CAN_COMMITTED;
+
+        // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
+        // about possibly accessing our state on a different thread outside of our dispatcher.
+        // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
+        // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
+        // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
+        return cohort.canCommit().get();
+    }
+
+
+
+    void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
+        state = State.PRE_COMMITTED;
+        cohort.preCommit().get();
+        userCohorts.canCommit(cohort.getCandidate());
+        userCohorts.preCommit();
+    }
+
+    void commit() throws InterruptedException, ExecutionException, TimeoutException {
+        state = State.COMMITTED;
+        cohort.commit().get();
+        userCohorts.commit();
+    }
+
+    void abort() throws InterruptedException, ExecutionException, TimeoutException {
+        state = State.ABORTED;
+        cohort.abort().get();
+        userCohorts.abort();
+    }
+
+    void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
+        Preconditions.checkState(cohort == null, "cohort was already set");
+
+        setDoImmediateCommit(doImmediateCommit);
+
+        cohort = transaction.ready();
+
+        if(cohortDecorator != null) {
+            // Call the hook for unit tests.
+            cohort = cohortDecorator.decorate(transactionID, cohort);
+        }
+    }
+
+    boolean isReadyToCommit() {
+        return replySender != null;
+    }
+
+    boolean isExpired(long expireTimeInMillis) {
+        return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
+    }
+
+    boolean isDoImmediateCommit() {
+        return doImmediateCommit;
+    }
+
+    void setDoImmediateCommit(boolean doImmediateCommit) {
+        this.doImmediateCommit = doImmediateCommit;
+    }
+
+    ActorRef getReplySender() {
+        return replySender;
+    }
+
+    void setReplySender(ActorRef replySender) {
+        this.replySender = replySender;
+    }
+
+    Shard getShard() {
+        return shard;
+    }
+
+    void setShard(Shard shard) {
+        this.shard = shard;
+    }
+
+
+    boolean isAborted() {
+        return state == State.ABORTED;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder builder = new StringBuilder();
+        builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
+                .append(doImmediateCommit).append("]");
+        return builder.toString();
+    }
+}
\ No newline at end of file
index 23ad747d4cf3d4c7247271c09f5c64fa9fc44c2e..7165581d8103fc64148e745e477bc1ec7f2278d6 100644 (file)
@@ -27,7 +27,6 @@ import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
index 45fa7727e4f125e11d73658c699ecbc779715ceb..f313329c7070a1fd582bf045888321493ee4074a 100644 (file)
@@ -10,10 +10,8 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import akka.actor.ActorRef;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
-import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -24,10 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
@@ -37,21 +32,17 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
-import scala.concurrent.duration.Duration;
 
 /**
  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
  *
  * @author Thomas Pantelis
  */
 
 /**
  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
  *
  * @author Thomas Pantelis
  */
-class ShardCommitCoordinator {
+final class ShardCommitCoordinator {
 
     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     public interface CohortDecorator {
 
     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     public interface CohortDecorator {
@@ -85,9 +76,6 @@ class ShardCommitCoordinator {
 
     private Runnable runOnPendingTransactionsComplete;
 
 
     private Runnable runOnPendingTransactionsComplete;
 
-
-    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
-
     ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
             String name) {
 
     ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
             String name) {
 
@@ -566,12 +554,12 @@ class ShardCommitCoordinator {
                 last.setTotalMessagesSent(newModifications.size());
                 messages.addAll(newModifications);
 
                 last.setTotalMessagesSent(newModifications.size());
                 messages.addAll(newModifications);
 
-                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.CAN_COMMITTED) {
+                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
                     messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
                             cohortEntry.getClientVersion()));
                 }
 
                     messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
                             cohortEntry.getClientVersion()));
                 }
 
-                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.PRE_COMMITTED) {
+                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
                     messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
                             cohortEntry.getClientVersion()));
                 }
                     messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
                             cohortEntry.getClientVersion()));
                 }
@@ -692,185 +680,4 @@ class ShardCommitCoordinator {
    void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
         cohortRegistry.process(sender, message);
     }
    void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
         cohortRegistry.process(sender, message);
     }
-
-    static class CohortEntry {
-        enum State {
-            PENDING,
-            CAN_COMMITTED,
-            PRE_COMMITTED,
-            COMMITTED,
-            ABORTED
-    }
-
-        private final String transactionID;
-        private ShardDataTreeCohort cohort;
-        private final ReadWriteShardDataTreeTransaction transaction;
-        private RuntimeException lastBatchedModificationsException;
-        private ActorRef replySender;
-        private Shard shard;
-        private boolean doImmediateCommit;
-        private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
-        private int totalBatchedModificationsReceived;
-        private State state = State.PENDING;
-        private final short clientVersion;
-        private final CompositeDataTreeCohort userCohorts;
-
-        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
-                DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
-            this.transaction = Preconditions.checkNotNull(transaction);
-            this.transactionID = transactionID;
-            this.clientVersion = clientVersion;
-            this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
-        }
-
-        CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
-                SchemaContext schema, short clientVersion) {
-            this.transactionID = transactionID;
-            this.cohort = cohort;
-            this.transaction = null;
-            this.clientVersion = clientVersion;
-            this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
-        }
-
-        void updateLastAccessTime() {
-            lastAccessTimer.reset();
-            lastAccessTimer.start();
-        }
-
-        String getTransactionID() {
-            return transactionID;
-        }
-
-        short getClientVersion() {
-            return clientVersion;
-        }
-
-        State getState() {
-            return state;
-        }
-
-        DataTreeCandidate getCandidate() {
-            return cohort.getCandidate();
-        }
-
-        DataTreeModification getDataTreeModification() {
-            return cohort.getDataTreeModification();
-        }
-
-        ReadWriteShardDataTreeTransaction getTransaction() {
-            return transaction;
-        }
-
-        int getTotalBatchedModificationsReceived() {
-            return totalBatchedModificationsReceived;
-        }
-
-        RuntimeException getLastBatchedModificationsException() {
-            return lastBatchedModificationsException;
-        }
-
-        void applyModifications(Iterable<Modification> modifications) {
-            totalBatchedModificationsReceived++;
-            if(lastBatchedModificationsException == null) {
-                for (Modification modification : modifications) {
-                        try {
-                            modification.apply(transaction.getSnapshot());
-                        } catch (RuntimeException e) {
-                            lastBatchedModificationsException = e;
-                            throw e;
-                        }
-                }
-            }
-        }
-
-        boolean canCommit() throws InterruptedException, ExecutionException {
-            state = State.CAN_COMMITTED;
-
-            // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
-            // about possibly accessing our state on a different thread outside of our dispatcher.
-            // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
-            // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
-            // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
-            return cohort.canCommit().get();
-        }
-
-
-
-        void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
-            state = State.PRE_COMMITTED;
-            cohort.preCommit().get();
-            userCohorts.canCommit(cohort.getCandidate());
-            userCohorts.preCommit();
-        }
-
-        void commit() throws InterruptedException, ExecutionException, TimeoutException {
-            state = State.COMMITTED;
-            cohort.commit().get();
-            userCohorts.commit();
-        }
-
-        void abort() throws InterruptedException, ExecutionException, TimeoutException {
-            state = State.ABORTED;
-            cohort.abort().get();
-            userCohorts.abort();
-        }
-
-        void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
-            Preconditions.checkState(cohort == null, "cohort was already set");
-
-            setDoImmediateCommit(doImmediateCommit);
-
-            cohort = transaction.ready();
-
-            if(cohortDecorator != null) {
-                // Call the hook for unit tests.
-                cohort = cohortDecorator.decorate(transactionID, cohort);
-            }
-        }
-
-        boolean isReadyToCommit() {
-            return replySender != null;
-        }
-
-        boolean isExpired(long expireTimeInMillis) {
-            return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
-        }
-
-        boolean isDoImmediateCommit() {
-            return doImmediateCommit;
-        }
-
-        void setDoImmediateCommit(boolean doImmediateCommit) {
-            this.doImmediateCommit = doImmediateCommit;
-        }
-
-        ActorRef getReplySender() {
-            return replySender;
-        }
-
-        void setReplySender(ActorRef replySender) {
-            this.replySender = replySender;
-        }
-
-        Shard getShard() {
-            return shard;
-        }
-
-        void setShard(Shard shard) {
-            this.shard = shard;
-        }
-
-
-        boolean isAborted() {
-            return state == State.ABORTED;
-        }
-
-        @Override
-        public String toString() {
-            final StringBuilder builder = new StringBuilder();
-            builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
-                    .append(doImmediateCommit).append("]");
-            return builder.toString();
-        }
-    }
 }
 }