BUG-5280: implement backend message handling 32/41032/75
authorRobert Varga <rovarga@cisco.com>
Wed, 29 Jun 2016 16:08:11 +0000 (18:08 +0200)
committerRobert Varga <rovarga@cisco.com>
Sat, 19 Nov 2016 15:11:41 +0000 (16:11 +0100)
This patch adds message routing on the backend so messages
earlier in the patch series get handled correctly.

Change-Id: Ie0ecfc1c8ce3c3b52b5b9c4986dd01444c2a719a
Signed-off-by: Robert Varga <rovarga@cisco.com>
31 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccess.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccessProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/NotLeaderException.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/OutOfOrderRequestException.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/UnknownHistoryException.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestException.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RuntimeRequestException.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/UnsupportedRequestException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java

diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java
new file mode 100644 (file)
index 0000000..562c0e5
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received a request to create a transaction which has
+ * already been purged.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class DeadTransactionException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    public DeadTransactionException(final long lastSeenTransaction) {
+        super("Transaction up to " + Long.toUnsignedString(lastSeenTransaction) + " are accounted for");
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return true;
+    }
+}
index c74711618cd9533b23119ff50b5ba3feb402eaf4..2955ed2b65eeac88e8dd9dd8d28a2ad42ff774d9 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.commands;
 
 import akka.actor.ActorRef;
 import com.google.common.annotations.Beta;
 
 import akka.actor.ActorRef;
 import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Request;
@@ -28,6 +29,7 @@ public abstract class LocalHistoryRequest<T extends LocalHistoryRequest<T>> exte
 
     LocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
         super(target, sequence, replyTo);
 
     LocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) {
         super(target, sequence, replyTo);
+        Preconditions.checkArgument(target.getHistoryId() != 0, "History identifier must be non-zero");
     }
 
     LocalHistoryRequest(final T request, final ABIVersion version) {
     }
 
     LocalHistoryRequest(final T request, final ABIVersion version) {
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccess.java
new file mode 100644 (file)
index 0000000..c4dd20d
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Response to a {@link ModifyTransactionRequest} which does not have a {@link PersistenceProtocol}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ModifyTransactionSuccess extends TransactionSuccess<ModifyTransactionSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    public ModifyTransactionSuccess(final TransactionIdentifier identifier, final long sequence) {
+        super(identifier, sequence);
+    }
+
+    private ModifyTransactionSuccess(final ModifyTransactionSuccess success, final ABIVersion version) {
+        super(success, version);
+    }
+
+    @Override
+    protected AbstractTransactionSuccessProxy<ModifyTransactionSuccess> externalizableProxy(final ABIVersion version) {
+        return new ModifyTransactionSuccessProxyV1(this);
+    }
+
+    @Override
+    protected ModifyTransactionSuccess cloneAsVersion(final ABIVersion version) {
+        return new ModifyTransactionSuccess(this, version);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccessProxyV1.java
new file mode 100644 (file)
index 0000000..0efff09
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link ModifyTransactionSuccess}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class ModifyTransactionSuccessProxyV1 extends AbstractTransactionSuccessProxy<ModifyTransactionSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+    // be able to create instances via reflection.
+    @SuppressWarnings("checkstyle:RedundantModifier")
+    public ModifyTransactionSuccessProxyV1() {
+        // For Externalizable
+    }
+
+    ModifyTransactionSuccessProxyV1(final ModifyTransactionSuccess success) {
+        super(success);
+    }
+
+    @Override
+    protected ModifyTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence) {
+        return new ModifyTransactionSuccess(target, sequence);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/NotLeaderException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/NotLeaderException.java
new file mode 100644 (file)
index 0000000..0864cd0
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * General error raised when the recipient of a Request is not the correct backend to talk to. This typically
+ * means that the backend processing has moved and the frontend needs to run rediscovery and retry the request.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class NotLeaderException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    public NotLeaderException(final ActorRef me) {
+        super("Actor " + me + " is not the current leader");
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return false;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/OutOfOrderRequestException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/OutOfOrderRequestException.java
new file mode 100644 (file)
index 0000000..07a3dba
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received an unexpected request. This would typically
+ * mean that messages are not arriving in the sequence they were generated by the frontend.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class OutOfOrderRequestException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    public OutOfOrderRequestException(final long expectedRequest) {
+        super("Expecting request " + Long.toUnsignedString(expectedRequest));
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return true;
+    }
+}
index e15c2dc7c40f8b5fb6c7680d0c513a9271aec4b3..636a2e741bd42f8fcd5471c92a794a21ae3b2528 100644 (file)
@@ -29,6 +29,10 @@ public abstract class TransactionSuccess<T extends TransactionSuccess<T>>
         super(identifier, sequence);
     }
 
         super(identifier, sequence);
     }
 
+    TransactionSuccess(final T success, final ABIVersion version) {
+        super(success, version);
+    }
+
     @Override
     protected abstract AbstractTransactionSuccessProxy<T> externalizableProxy(ABIVersion version);
 }
     @Override
     protected abstract AbstractTransactionSuccessProxy<T> externalizableProxy(ABIVersion version);
 }
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/UnknownHistoryException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/UnknownHistoryException.java
new file mode 100644 (file)
index 0000000..196c60c
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received a request referencing an unknown history. This
+ * typically happens when the linear history ID is newer than the highest observed {@link CreateLocalHistoryRequest}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class UnknownHistoryException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    public UnknownHistoryException(final Long lastSeenHistory) {
+        super("Last known history is " + historyToString(lastSeenHistory));
+    }
+
+    private static String historyToString(final Long history) {
+        return history == null ? "null" : Long.toUnsignedString(history.longValue());
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return true;
+    }
+}
index f3b3d734212ed5daab539f07dff0bb242ed30a7b..5f525b0a9beb8e60174367f4798121d89a8200a6 100644 (file)
@@ -24,7 +24,7 @@ public abstract class RequestException extends Exception {
         super(Preconditions.checkNotNull(message));
     }
 
         super(Preconditions.checkNotNull(message));
     }
 
-    protected RequestException(@Nonnull final String message, @Nonnull final Exception cause) {
+    protected RequestException(@Nonnull final String message, @Nonnull final Throwable cause) {
         super(Preconditions.checkNotNull(message), Preconditions.checkNotNull(cause));
     }
 
         super(Preconditions.checkNotNull(message), Preconditions.checkNotNull(cause));
     }
 
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RuntimeRequestException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RuntimeRequestException.java
new file mode 100644 (file)
index 0000000..2cf1828
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * General error raised when the recipient of a {@link Request} fails to process a request.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class RuntimeRequestException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    public RuntimeRequestException(final String message, final Throwable cause) {
+        super(message, cause);
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(message), "Exception message is mandatory");
+        Preconditions.checkNotNull(cause);
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return false;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/UnsupportedRequestException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/UnsupportedRequestException.java
new file mode 100644 (file)
index 0000000..903ed59
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.concepts;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * General error raised when the recipient of a {@link Request} determines that it does not know how to handle
+ * the request.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class UnsupportedRequestException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    public UnsupportedRequestException(final Request<?, ?> request) {
+        super("Unsupported request " + request.getClass());
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return false;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
new file mode 100644 (file)
index 0000000..7a66eab
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract class for providing logical tracking of frontend local histories. This class is specialized for
+ * standalone transactions and chained transactions.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractFrontendHistory.class);
+    private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0);
+
+    private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
+    private final String persistenceId;
+
+    AbstractFrontendHistory(final String persistenceId) {
+        this.persistenceId = Preconditions.checkNotNull(persistenceId);
+    }
+
+    final String persistenceId() {
+        return persistenceId;
+    }
+
+    final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+            final RequestEnvelope envelope) throws RequestException {
+
+        // FIXME: handle purging of transactions
+
+        final TransactionIdentifier id = request.getTarget();
+        FrontendTransaction tx = transactions.get(id);
+        if (tx == null) {
+            // The transaction does not exist and we are about to create it, check sequence number
+            if (request.getSequence() != 0) {
+                LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
+                throw UNSEQUENCED_START;
+            }
+
+            if (request instanceof CommitLocalTransactionRequest) {
+                tx = createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
+                LOG.debug("{}: allocated new ready transaction {}", persistenceId(), id);
+            } else {
+                tx = createOpenTransaction(id);
+                LOG.debug("{}: allocated new open transaction {}", persistenceId(), id);
+            }
+
+            transactions.put(id, tx);
+        } else {
+            final Optional<TransactionSuccess<?>> replay = tx.replaySequence(request.getSequence());
+            if (replay.isPresent()) {
+                return replay.get();
+            }
+        }
+
+        return tx.handleRequest(request, envelope);
+    }
+
+    abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
+
+    abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod)
+        throws RequestException;
+
+    abstract ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod);
+}
index f7f8af292310e76e5213e47052abef3669351f30..222280cf3b0ade880f0486c56f2517dfb90b0620 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import com.google.common.base.Preconditions;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 
 /**
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 
 /**
@@ -19,17 +20,20 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
  * @param <T> Backing transaction type.
  */
 @NotThreadSafe
  * @param <T> Backing transaction type.
  */
 @NotThreadSafe
-abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot> {
-    private final T snapshot;
+abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
+        implements Identifiable<TransactionIdentifier> {
     private final TransactionIdentifier id;
     private final TransactionIdentifier id;
+    private final T snapshot;
+
     private boolean closed;
 
     private boolean closed;
 
-    protected AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) {
+    AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) {
         this.snapshot = Preconditions.checkNotNull(snapshot);
         this.id = Preconditions.checkNotNull(id);
     }
 
         this.snapshot = Preconditions.checkNotNull(snapshot);
         this.id = Preconditions.checkNotNull(id);
     }
 
-    final TransactionIdentifier getId() {
+    @Override
+    public final TransactionIdentifier getIdentifier() {
         return id;
     }
 
         return id;
     }
 
index 799dd6eb1bcddc382c48a244bb9ba081377334ad..d6b43c46a01cf795a190c1f27e9890b95811047b 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
@@ -65,8 +64,8 @@ final class ChainedCommitCohort extends ShardDataTreeCohort {
     }
 
     @Override
     }
 
     @Override
-    public ListenableFuture<Void> abort() {
-        return delegate.abort();
+    public void abort(final FutureCallback<Void> callback) {
+        delegate.abort(callback);
     }
 
     @Override
     }
 
     @Override
@@ -88,4 +87,5 @@ final class ChainedCommitCohort extends ShardDataTreeCohort {
     public State getState() {
         return delegate.getState();
     }
     public State getState() {
         return delegate.getState();
     }
+
 }
 }
index 84535b593eebb5da9b4df5f7353336ed5b287cd2..a66a49685fa2f1552b269a9c71b65e058cfab571 100644 (file)
@@ -11,8 +11,6 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@ -33,7 +31,7 @@ final class CohortEntry {
 
     private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
         this.transaction = Preconditions.checkNotNull(transaction);
 
     private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
         this.transaction = Preconditions.checkNotNull(transaction);
-        this.transactionId = transaction.getId();
+        this.transactionId = transaction.getIdentifier();
         this.clientVersion = clientVersion;
     }
 
         this.clientVersion = clientVersion;
     }
 
@@ -107,8 +105,8 @@ final class CohortEntry {
         cohort.commit(callback);
     }
 
         cohort.commit(callback);
     }
 
-    void abort() throws InterruptedException, ExecutionException, TimeoutException {
-        cohort.abort().get();
+    void abort(final FutureCallback<Void> callback) {
+        cohort.abort(callback);
     }
 
     void ready(final CohortDecorator cohortDecorator) {
     }
 
     void ready(final CohortDecorator cohortDecorator) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java
new file mode 100644 (file)
index 0000000..7b542db
--- /dev/null
@@ -0,0 +1,399 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Queue;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Frontend transaction state as observed by the shard leader.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class FrontendTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class);
+
+    private final AbstractFrontendHistory history;
+    private final TransactionIdentifier id;
+
+    /**
+     * It is possible that after we process a request and send a response that response gets lost and the client
+     * initiates a retry. Since subsequent requests can mutate transaction state we need to retain the response until
+     * it is acknowledged by the client.
+     */
+    private final Queue<Object> replayQueue = new ArrayDeque<>();
+    private long firstReplaySequence;
+    private Long lastPurgedSequence;
+    private long expectedSequence;
+
+    private ReadWriteShardDataTreeTransaction openTransaction;
+    private ModifyTransactionSuccess cachedModifySuccess;
+    private DataTreeModification sealedModification;
+    private ShardDataTreeCohort readyCohort;
+
+    private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
+            final ReadWriteShardDataTreeTransaction transaction) {
+        this.history = Preconditions.checkNotNull(history);
+        this.id = Preconditions.checkNotNull(id);
+        this.openTransaction = Preconditions.checkNotNull(transaction);
+    }
+
+    private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
+            final DataTreeModification mod) {
+        this.history = Preconditions.checkNotNull(history);
+        this.id = Preconditions.checkNotNull(id);
+        this.sealedModification = Preconditions.checkNotNull(mod);
+    }
+
+    static FrontendTransaction createOpen(final AbstractFrontendHistory history,
+            final ReadWriteShardDataTreeTransaction transaction) {
+        return new FrontendTransaction(history, transaction.getIdentifier(), transaction);
+    }
+
+    static FrontendTransaction createReady(final AbstractFrontendHistory history, final TransactionIdentifier id,
+            final DataTreeModification mod) {
+        return new FrontendTransaction(history, id, mod);
+    }
+
+    java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
+        // Fast path check: if the requested sequence is the next request, bail early
+        if (expectedSequence == sequence) {
+            return java.util.Optional.empty();
+        }
+
+        // Check sequencing: we do not need to bother with future requests
+        if (Long.compareUnsigned(expectedSequence, sequence) < 0) {
+            throw new OutOfOrderRequestException(expectedSequence);
+        }
+
+        // Sanity check: if we have purged sequences, this has to be newer
+        if (lastPurgedSequence != null && Long.compareUnsigned(lastPurgedSequence.longValue(), sequence) >= 0) {
+            // Client has sent a request sequence, which has already been purged. This is a hard error, which should
+            // never occur. Throwing an IllegalArgumentException will cause it to be wrapped in a
+            // RuntimeRequestException (which is not retriable) and report it back to the client.
+            throw new IllegalArgumentException(String.format("Invalid purged sequence %s (last purged is %s)",
+                sequence, lastPurgedSequence));
+        }
+
+        // At this point we have established that the requested sequence lies in the open interval
+        // (lastPurgedSequence, expectedSequence). That does not actually mean we have a response, as the commit
+        // machinery is asynchronous, hence a reply may be in the works and not available.
+
+        long replaySequence = firstReplaySequence;
+        final Iterator<?> it = replayQueue.iterator();
+        while (it.hasNext()) {
+            final Object replay = it.next();
+            if (replaySequence == sequence) {
+                if (replay instanceof RequestException) {
+                    throw (RequestException) replay;
+                }
+
+                Verify.verify(replay instanceof TransactionSuccess);
+                return java.util.Optional.of((TransactionSuccess<?>) replay);
+            }
+
+            replaySequence++;
+        }
+
+        // Not found
+        return java.util.Optional.empty();
+    }
+
+    void purgeSequencesUpTo(final long sequence) {
+        // FIXME: implement this
+
+        lastPurgedSequence = sequence;
+    }
+
+    // Sequence has already been checked
+    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope)
+            throws RequestException {
+        if (request instanceof ModifyTransactionRequest) {
+            return handleModifyTransaction((ModifyTransactionRequest) request, envelope);
+        } else if (request instanceof CommitLocalTransactionRequest) {
+            handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope);
+            return null;
+        } else if (request instanceof ExistsTransactionRequest) {
+            return handleExistsTransaction((ExistsTransactionRequest) request);
+        } else if (request instanceof ReadTransactionRequest) {
+            return handleReadTransaction((ReadTransactionRequest) request);
+        } else if (request instanceof TransactionPreCommitRequest) {
+            handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope);
+            return null;
+        } else if (request instanceof TransactionDoCommitRequest) {
+            handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope);
+            return null;
+        } else if (request instanceof TransactionAbortRequest) {
+            handleTransactionAbort((TransactionAbortRequest) request, envelope);
+            return null;
+        } else {
+            throw new UnsupportedRequestException(request);
+        }
+    }
+
+    private void recordResponse(final long sequence, final Object response) {
+        if (replayQueue.isEmpty()) {
+            firstReplaySequence = sequence;
+        }
+        replayQueue.add(response);
+        expectedSequence++;
+    }
+
+    private <T extends TransactionSuccess<?>> T recordSuccess(final long sequence, final T success) {
+        recordResponse(sequence, success);
+        return success;
+    }
+
+    private void recordAndSendSuccess(final RequestEnvelope envelope, final TransactionSuccess<?> success) {
+        recordResponse(success.getSequence(), success);
+        envelope.sendSuccess(success);
+    }
+
+    private void recordAndSendFailure(final RequestEnvelope envelope, final RuntimeRequestException failure) {
+        recordResponse(envelope.getMessage().getSequence(), failure);
+        envelope.sendFailure(failure);
+    }
+
+    private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
+            final RequestEnvelope envelope) throws RequestException {
+        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+            @Override
+            public void onSuccess(final DataTreeCandidate result) {
+                recordAndSendSuccess(envelope, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
+                    request.getSequence()));
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, new RuntimeRequestException("Precommit failed", failure));
+                readyCohort = null;
+            }
+        });
+    }
+
+    private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope)
+            throws RequestException {
+        readyCohort.commit(new FutureCallback<UnsignedLong>() {
+            @Override
+            public void onSuccess(final UnsignedLong result) {
+                successfulCommit(envelope);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, new RuntimeRequestException("Commit failed", failure));
+                readyCohort = null;
+            }
+        });
+    }
+
+    private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope)
+            throws RequestException {
+        readyCohort.abort(new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                readyCohort = null;
+                recordAndSendSuccess(envelope, new TransactionAbortSuccess(id, request.getSequence()));
+                LOG.debug("Transaction {} aborted", id);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                readyCohort = null;
+                LOG.warn("Transaction {} abort failed", id, failure);
+                recordAndSendFailure(envelope, new RuntimeRequestException("Abort failed", failure));
+            }
+        });
+    }
+
+    private void coordinatedCommit(final RequestEnvelope envelope) {
+        readyCohort.canCommit(new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                recordAndSendSuccess(envelope, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
+                    envelope.getMessage().getSequence()));
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+                readyCohort = null;
+            }
+        });
+    }
+
+    private void directCommit(final RequestEnvelope envelope) {
+        readyCohort.canCommit(new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                successfulDirectCanCommit(envelope);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+                readyCohort = null;
+            }
+        });
+
+    }
+
+    private void successfulDirectCanCommit(final RequestEnvelope envelope) {
+        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+            @Override
+            public void onSuccess(final DataTreeCandidate result) {
+                successfulDirectPreCommit(envelope);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, new RuntimeRequestException("PreCommit failed", failure));
+                readyCohort = null;
+            }
+        });
+    }
+
+    private void successfulDirectPreCommit(final RequestEnvelope envelope) {
+        readyCohort.commit(new FutureCallback<UnsignedLong>() {
+
+            @Override
+            public void onSuccess(final UnsignedLong result) {
+                successfulCommit(envelope);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                recordAndSendFailure(envelope, new RuntimeRequestException("DoCommit failed", failure));
+                readyCohort = null;
+            }
+        });
+    }
+
+    private void successfulCommit(final RequestEnvelope envelope) {
+        recordAndSendSuccess(envelope, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+            envelope.getMessage().getSequence()));
+        readyCohort = null;
+    }
+
+    private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
+            final RequestEnvelope envelope) throws RequestException {
+        if (sealedModification.equals(request.getModification())) {
+            readyCohort = history.createReadyCohort(id, sealedModification);
+
+            if (request.isCoordinated()) {
+                coordinatedCommit(envelope);
+            } else {
+                directCommit(envelope);
+            }
+        } else {
+            throw new UnsupportedRequestException(request);
+        }
+    }
+
+    private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
+            throws RequestException {
+        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(),
+            data.isPresent()));
+    }
+
+    private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) throws RequestException {
+        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
+    }
+
+    private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
+        if (cachedModifySuccess == null) {
+            cachedModifySuccess = new ModifyTransactionSuccess(id, sequence);
+        }
+
+        return recordSuccess(sequence, cachedModifySuccess);
+    }
+
+    private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
+            final RequestEnvelope envelope) throws RequestException {
+
+        final DataTreeModification modification = openTransaction.getSnapshot();
+        for (TransactionModification m : request.getModifications()) {
+            if (m instanceof TransactionDelete) {
+                modification.delete(m.getPath());
+            } else if (m instanceof TransactionWrite) {
+                modification.write(m.getPath(), ((TransactionWrite) m).getData());
+            } else if (m instanceof TransactionMerge) {
+                modification.merge(m.getPath(), ((TransactionMerge) m).getData());
+            } else {
+                LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m);
+            }
+        }
+
+        final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
+        if (!maybeProto.isPresent()) {
+            return replyModifySuccess(request.getSequence());
+        }
+
+        switch (maybeProto.get()) {
+            case ABORT:
+                openTransaction.abort();
+                openTransaction = null;
+                return replyModifySuccess(request.getSequence());
+            case SIMPLE:
+                readyCohort = openTransaction.ready();
+                openTransaction = null;
+                directCommit(envelope);
+                return null;
+            case THREE_PHASE:
+                readyCohort = openTransaction.ready();
+                openTransaction = null;
+                coordinatedCommit(envelope);
+                return null;
+            default:
+                throw new UnsupportedRequestException(request);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
new file mode 100644 (file)
index 0000000..3c65b79
--- /dev/null
@@ -0,0 +1,211 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
+import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
+import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.UnknownHistoryException;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing
+ * in the frontend/backend conversation.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
+
+    // Histories which have not been purged
+    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories = new HashMap<>();
+
+    // RangeSet performs automatic merging, hence we keep minimal state tracking information
+    private final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
+
+    // Used for all standalone transactions
+    private final AbstractFrontendHistory standaloneHistory;
+    private final ShardDataTree tree;
+    private final ClientIdentifier clientId;
+    private final String persistenceId;
+
+    private long expectedTxSequence;
+    private Long lastSeenHistory = null;
+
+
+    // TODO: explicit failover notification
+    //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
+    //       to the frontend client -- that way it can immediately start sending requests
+
+    // TODO: add statistics:
+    // - number of requests processed
+    // - number of histories processed
+    // - per-RequestException throw counters
+
+    LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+        this.persistenceId = Preconditions.checkNotNull(persistenceId);
+        this.clientId = Preconditions.checkNotNull(clientId);
+        this.tree = Preconditions.checkNotNull(tree);
+        standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree);
+    }
+
+    @Override
+    public ClientIdentifier getIdentifier() {
+        return clientId;
+    }
+
+    private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException {
+        if (expectedTxSequence != envelope.getTxSequence()) {
+            throw new OutOfOrderRequestException(expectedTxSequence);
+        }
+    }
+
+    private void expectNextRequest() {
+        expectedTxSequence++;
+    }
+
+    @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
+            final RequestEnvelope envelope) throws RequestException {
+        checkRequestSequence(envelope);
+
+        try {
+            if (request instanceof CreateLocalHistoryRequest) {
+                return handleCreateHistory((CreateLocalHistoryRequest) request);
+            } else if (request instanceof DestroyLocalHistoryRequest) {
+                return handleDestroyHistory((DestroyLocalHistoryRequest) request);
+            } else if (request instanceof PurgeLocalHistoryRequest) {
+                return handlePurgeHistory((PurgeLocalHistoryRequest)request);
+            } else {
+                throw new UnsupportedRequestException(request);
+            }
+        } finally {
+            expectNextRequest();
+        }
+    }
+
+    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
+        final LocalHistoryIdentifier id = request.getTarget();
+        final AbstractFrontendHistory existing = localHistories.get(id);
+        if (existing != null) {
+            // History already exists: report success
+            LOG.debug("{}: history {} already exists", persistenceId, id);
+            return new LocalHistorySuccess(id, request.getSequence());
+        }
+
+        // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
+        // end up resurrecting a purged history.
+        if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+            LOG.debug("{}: rejecting purged request {}", persistenceId, request);
+            throw new DeadHistoryException(lastSeenHistory.longValue());
+        }
+
+        // Update last history we have seen
+        if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
+            lastSeenHistory = id.getHistoryId();
+        }
+
+        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id)));
+        LOG.debug("{}: created history {}", persistenceId, id);
+        return new LocalHistorySuccess(id, request.getSequence());
+    }
+
+    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException {
+        final LocalHistoryIdentifier id = request.getTarget();
+        final LocalFrontendHistory existing = localHistories.get(id);
+        if (existing == null) {
+            // History does not exist: report success
+            LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id);
+            return new LocalHistorySuccess(id, request.getSequence());
+        }
+
+        return existing.destroy(request.getSequence());
+    }
+
+    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException {
+        final LocalHistoryIdentifier id = request.getTarget();
+        final LocalFrontendHistory existing = localHistories.remove(id);
+        if (existing != null) {
+            purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
+
+            if (!existing.isDestroyed()) {
+                LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
+                existing.destroy(request.getSequence());
+            }
+
+            // FIXME: record a PURGE tombstone in the journal
+
+            LOG.debug("{}: purged history {}", persistenceId, id);
+        } else {
+            LOG.debug("{}: history {} has already been purged", persistenceId, id);
+        }
+
+        return new LocalHistorySuccess(id, request.getSequence());
+    }
+
+    @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+            final RequestEnvelope envelope) throws RequestException {
+        checkRequestSequence(envelope);
+
+        try {
+            final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
+            final AbstractFrontendHistory history;
+
+            if (lhId.getHistoryId() != 0) {
+                history = localHistories.get(lhId);
+                if (history == null) {
+                    LOG.debug("{}: rejecting unknown history request {}", persistenceId, request);
+                    throw new UnknownHistoryException(lastSeenHistory);
+                }
+            } else {
+                history = standaloneHistory;
+            }
+
+            return history.handleTransactionRequest(request, envelope);
+        } finally {
+            expectNextRequest();
+        }
+    }
+
+    void reconnect() {
+        expectedTxSequence = 0;
+    }
+
+    void retire() {
+        // FIXME: flush all state
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(LeaderFrontendState.class).add("clientId", clientId)
+                .add("purgedHistories", purgedHistories).toString();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
new file mode 100644 (file)
index 0000000..a03b54f
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
+import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Chained transaction specialization of {@link AbstractFrontendHistory}. It prevents concurrent open transactions.
+ *
+ * @author Robert Varga
+ */
+final class LocalFrontendHistory extends AbstractFrontendHistory {
+    private enum State {
+        OPEN,
+        CLOSED,
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class);
+
+    private final ShardDataTreeTransactionChain chain;
+
+    private Long lastSeenTransaction;
+    private State state = State.OPEN;
+
+    LocalFrontendHistory(final String persistenceId, final ShardDataTreeTransactionChain chain) {
+        super(persistenceId);
+        this.chain = Preconditions.checkNotNull(chain);
+    }
+
+    @Override
+    public LocalHistoryIdentifier getIdentifier() {
+        return chain.getIdentifier();
+    }
+
+    @Override
+    FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
+        checkDeadTransaction(id);
+        lastSeenTransaction = id.getTransactionId();
+        return FrontendTransaction.createOpen(this, chain.newReadWriteTransaction(id));
+    }
+
+    @Override
+    FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod)
+            throws RequestException {
+        checkDeadTransaction(id);
+        lastSeenTransaction = id.getTransactionId();
+        return FrontendTransaction.createReady(this, id, mod);
+    }
+
+    @Override
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
+        return chain.createReadyCohort(id, mod);
+    }
+
+    LocalHistorySuccess destroy(final long sequence) throws RequestException {
+        if (state != State.CLOSED) {
+            LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
+
+            // FIXME: add any finalization as needed
+            state = State.CLOSED;
+        }
+
+        // FIXME: record a DESTROY tombstone in the journal
+        return new LocalHistorySuccess(getIdentifier(), sequence);
+    }
+
+    boolean isDestroyed() {
+        return state == State.CLOSED;
+    }
+
+    private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException {
+        // FIXME: check if this history is still open
+        // FIXME: check if the last transaction has been submitted
+
+        // Transaction identifiers within a local history have to have increasing IDs
+        if (lastSeenTransaction != null && Long.compareUnsigned(lastSeenTransaction, id.getTransactionId()) >= 0) {
+            throw new DeadTransactionException(lastSeenTransaction);
+        }
+    }
+}
index 771de8cd12e6f5a31440c7edbf244103e7852832..492f6ec9f62efa2314ada5b8e27cb9fa3c94cb25 100644 (file)
@@ -14,7 +14,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
     private final ShardDataTreeTransactionParent parent;
 
 public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
     private final ShardDataTreeTransactionParent parent;
 
-    protected ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent,
+    ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent,
             final TransactionIdentifier id, final DataTreeModification modification) {
         super(id, modification);
         this.parent = Preconditions.checkNotNull(parent);
             final TransactionIdentifier id, final DataTreeModification modification) {
         super(id, modification);
         this.parent = Preconditions.checkNotNull(parent);
index c98e11d60a2e063395ed0b428d2eb5baaee77ea2..4a1be9862b7fb6114511c068dd255f17e4b59a1e 100644 (file)
@@ -12,18 +12,39 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.Props;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Range;
 import java.io.IOException;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
+import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
 import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
@@ -96,6 +117,17 @@ public class Shard extends RaftActor {
     // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
     public static final String DEFAULT_NAME = "default";
 
     // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
     public static final String DEFAULT_NAME = "default";
 
+    private static final Collection<ABIVersion> SUPPORTED_ABIVERSIONS;
+
+    static {
+        final ABIVersion[] values = ABIVersion.values();
+        final ABIVersion[] real = Arrays.copyOfRange(values, 1, values.length - 1);
+        SUPPORTED_ABIVERSIONS = ImmutableList.copyOf(real).reverse();
+    }
+
+    // FIXME: make this a dynamic property based on mailbox size and maximum number of clients
+    private static final int CLIENT_MAX_MESSAGES = 1000;
+
     // The state of this Shard
     private final ShardDataTree store;
 
     // The state of this Shard
     private final ShardDataTree store;
 
@@ -129,6 +161,7 @@ public class Shard extends RaftActor {
     private final ShardTransactionMessageRetrySupport messageRetrySupport;
 
     private final FrontendMetadata frontendMetadata = new FrontendMetadata();
     private final ShardTransactionMessageRetrySupport messageRetrySupport;
 
     private final FrontendMetadata frontendMetadata = new FrontendMetadata();
+    private final Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = new HashMap<>();
 
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
 
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
@@ -219,6 +252,7 @@ public class Shard extends RaftActor {
         }
     }
 
         }
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     @Override
     protected void handleNonRaftCommand(final Object message) {
         try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
     @Override
     protected void handleNonRaftCommand(final Object message) {
         try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
@@ -228,7 +262,23 @@ public class Shard extends RaftActor {
                     maybeError.get());
             }
 
                     maybeError.get());
             }
 
-            if (CreateTransaction.isSerializedType(message)) {
+            if (message instanceof RequestEnvelope) {
+                final RequestEnvelope envelope = (RequestEnvelope)message;
+                try {
+                    final RequestSuccess<?, ?> success = handleRequest(envelope);
+                    if (success != null) {
+                        envelope.sendSuccess(success);
+                    }
+                } catch (RequestException e) {
+                    LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
+                    envelope.sendFailure(e);
+                } catch (Exception e) {
+                    LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
+                    envelope.sendFailure(new RuntimeRequestException("Request failed to process", e));
+                }
+            } else if (message instanceof ConnectClientRequest) {
+                handleConnectClient((ConnectClientRequest)message);
+            } else if (CreateTransaction.isSerializedType(message)) {
                 handleCreateTransaction(message);
             } else if (message instanceof BatchedModifications) {
                 handleBatchedModifications((BatchedModifications)message);
                 handleCreateTransaction(message);
             } else if (message instanceof BatchedModifications) {
                 handleBatchedModifications((BatchedModifications)message);
@@ -280,6 +330,86 @@ public class Shard extends RaftActor {
         }
     }
 
         }
     }
 
+    // Acquire our frontend tracking handle and verify generation matches
+    private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+        final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
+        if (existing != null) {
+            final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
+            if (cmp == 0) {
+                return existing;
+            }
+            if (cmp > 0) {
+                LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
+                throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+            }
+
+            LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
+            existing.retire();
+            knownFrontends.remove(clientId.getFrontendId());
+        } else {
+            LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
+        }
+
+        final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store);
+        knownFrontends.put(clientId.getFrontendId(), ret);
+        LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId);
+        return ret;
+    }
+
+    private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
+        final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
+        for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
+            if (clientRange.contains(v)) {
+                return v;
+            }
+        }
+
+        throw new IllegalArgumentException(String.format(
+            "No common version between backend versions %s and client versions %s", SUPPORTED_ABIVERSIONS,
+            clientRange));
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void handleConnectClient(final ConnectClientRequest message) {
+        try {
+            if (!isLeader() || !isLeaderActive()) {
+                LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message);
+                throw new NotLeaderException(getSelf());
+            }
+
+            final ABIVersion selectedVersion = selectVersion(message);
+            final LeaderFrontendState frontend = getFrontend(message.getTarget());
+            frontend.reconnect();
+            message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(),
+                ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion),
+                ActorRef.noSender());
+        } catch (RequestException | RuntimeException e) {
+            message.getReplyTo().tell(new Failure(e), ActorRef.noSender());
+        }
+    }
+
+    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope) throws RequestException {
+        // We are not the leader, hence we want to fail-fast.
+        if (!isLeader() || !isLeaderActive()) {
+            LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
+            throw new NotLeaderException(getSelf());
+        }
+
+        final Request<?, ?> request = envelope.getMessage();
+        if (request instanceof TransactionRequest) {
+            final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
+            final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
+            return getFrontend(clientId).handleTransactionRequest(txReq, envelope);
+        } else if (request instanceof LocalHistoryRequest) {
+            final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
+            final ClientIdentifier clientId = lhReq.getTarget().getClientId();
+            return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope);
+        } else {
+            LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
+            throw new UnsupportedRequestException(request);
+        }
+    }
+
     private boolean hasLeader() {
         return getLeaderId() != null;
     }
     private boolean hasLeader() {
         return getLeaderId() != null;
     }
@@ -365,7 +495,7 @@ public class Shard extends RaftActor {
         } catch (Exception e) {
             LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                     batched.getTransactionId(), e);
         } catch (Exception e) {
             LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                     batched.getTransactionId(), e);
-            sender.tell(new akka.actor.Status.Failure(e), getSelf());
+            sender.tell(new Failure(e), getSelf());
         }
     }
 
         }
     }
 
@@ -411,7 +541,7 @@ public class Shard extends RaftActor {
 
     private boolean failIfIsolatedLeader(final ActorRef sender) {
         if (isIsolatedLeader()) {
 
     private boolean failIfIsolatedLeader(final ActorRef sender) {
         if (isIsolatedLeader()) {
-            sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+            sender.tell(new Failure(new NoShardLeaderException(String.format(
                     "Shard %s was the leader but has lost contact with all of its followers. Either all"
                     + " other follower nodes are down or this node is isolated by a network partition.",
                     persistenceId()))), getSelf());
                     "Shard %s was the leader but has lost contact with all of its followers. Either all"
                     + " other follower nodes are down or this node is isolated by a network partition.",
                     persistenceId()))), getSelf());
@@ -436,7 +566,7 @@ public class Shard extends RaftActor {
             } catch (Exception e) {
                 LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
                         message.getTransactionId(), e);
             } catch (Exception e) {
                 LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
                         message.getTransactionId(), e);
-                getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+                getSender().tell(new Failure(e), getSelf());
             }
         } else {
             ActorSelection leader = getLeader();
             }
         } else {
             ActorSelection leader = getLeader();
@@ -487,7 +617,7 @@ public class Shard extends RaftActor {
         } else if (getLeader() != null) {
             getLeader().forward(message, getContext());
         } else {
         } else if (getLeader() != null) {
             getLeader().forward(message, getContext());
         } else {
-            getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
+            getSender().tell(new Failure(new NoShardLeaderException(
                     "Could not create a shard transaction", persistenceId())), getSelf());
         }
     }
                     "Could not create a shard transaction", persistenceId())), getSelf());
         }
     }
@@ -510,7 +640,7 @@ public class Shard extends RaftActor {
             getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
                     createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
         } catch (Exception e) {
             getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
                     createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
         } catch (Exception e) {
-            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+            getSender().tell(new Failure(e), getSelf());
         }
     }
 
         }
     }
 
index 2773a3e3bfd21ad4559bb6baf05c1d48661cfe9c..33634b1d6c3e322642e9b458e05d59feb74a577e 100644 (file)
@@ -23,8 +23,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -370,21 +368,25 @@ final class ShardCommitCoordinator {
         log.debug("{}: Aborting transaction {}", name, transactionID);
 
         final ActorRef self = shard.getSelf();
         log.debug("{}: Aborting transaction {}", name, transactionID);
 
         final ActorRef self = shard.getSelf();
-        try {
-            cohortEntry.abort();
+        cohortEntry.abort(new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                if (sender != null) {
+                    sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+                }
+            }
 
 
-            shard.getShardMBean().incrementAbortTransactionsCount();
+            @Override
+            public void onFailure(final Throwable failure) {
+                log.error("{}: An exception happened during abort", name, failure);
 
 
-            if (sender != null) {
-                sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+                if (sender != null) {
+                    sender.tell(new Failure(failure), self);
+                }
             }
             }
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            log.error("{}: An exception happened during abort", name, e);
+        });
 
 
-            if (sender != null) {
-                sender.tell(new Failure(e), self);
-            }
-        }
+        shard.getShardMBean().incrementAbortTransactionsCount();
     }
 
     void checkForExpiredTransactions(final long timeout, final Shard shard) {
     }
 
     void checkForExpiredTransactions(final long timeout, final Shard shard) {
index 7f3476090bb9a3a177e4999f41c3a18768949d3c..ef6436ae8dbe3ada08626771bb11f8a2c876356b 100644 (file)
@@ -367,7 +367,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
         }
     }
 
-    private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) {
         ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
         ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this);
@@ -475,7 +475,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
 
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
 
-        return createReadyCohort(transaction.getId(), snapshot);
+        return createReadyCohort(transaction.getIdentifier(), snapshot);
     }
 
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
     }
 
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
@@ -668,6 +668,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         cohortRegistry.process(sender, message);
     }
 
         cohortRegistry.process(sender, message);
     }
 
+    @Override
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
             final DataTreeModification modification) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
             final DataTreeModification modification) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
index 0a3a6ae177d7827cf2afb6441f3f6006a9209795..4df9dea7dbdb4038403f0e0795b797e0bf10e000 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -49,7 +48,7 @@ public abstract class ShardDataTreeCohort implements Identifiable<TransactionIde
     public abstract void preCommit(FutureCallback<DataTreeCandidate> callback);
 
     @VisibleForTesting
     public abstract void preCommit(FutureCallback<DataTreeCandidate> callback);
 
     @VisibleForTesting
-    public abstract ListenableFuture<Void> abort();
+    public abstract void abort(FutureCallback<Void> callback);
 
     @VisibleForTesting
     public abstract void commit(FutureCallback<UnsignedLong> callback);
 
     @VisibleForTesting
     public abstract void commit(FutureCallback<UnsignedLong> callback);
index 004e305f70ccf49d09a55694a3382a32b4fa54d9..9a8e89eb422c4b4f909e6a6c3ea97366d1ab01f2 100644 (file)
@@ -12,6 +12,8 @@ import com.google.common.base.Preconditions;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -20,18 +22,19 @@ import org.slf4j.LoggerFactory;
  * A transaction chain attached to a Shard.
  */
 @NotThreadSafe
  * A transaction chain attached to a Shard.
  */
 @NotThreadSafe
-final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent {
+final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
+        implements Identifiable<LocalHistoryIdentifier> {
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class);
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class);
-    private final ShardDataTree dataTree;
     private final LocalHistoryIdentifier chainId;
     private final LocalHistoryIdentifier chainId;
+    private final ShardDataTree dataTree;
 
     private ReadWriteShardDataTreeTransaction previousTx;
     private ReadWriteShardDataTreeTransaction openTransaction;
     private boolean closed;
 
     ShardDataTreeTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier, final ShardDataTree dataTree) {
 
     private ReadWriteShardDataTreeTransaction previousTx;
     private ReadWriteShardDataTreeTransaction openTransaction;
     private boolean closed;
 
     ShardDataTreeTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier, final ShardDataTree dataTree) {
-        this.dataTree = Preconditions.checkNotNull(dataTree);
         this.chainId = Preconditions.checkNotNull(localHistoryIdentifier);
         this.chainId = Preconditions.checkNotNull(localHistoryIdentifier);
+        this.dataTree = Preconditions.checkNotNull(dataTree);
     }
 
     private DataTreeSnapshot getSnapshot() {
     }
 
     private DataTreeSnapshot getSnapshot() {
@@ -94,9 +97,19 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
         return MoreObjects.toStringHelper(this).add("id", chainId).toString();
     }
 
         return MoreObjects.toStringHelper(this).add("id", chainId).toString();
     }
 
-    void clearTransaction(ReadWriteShardDataTreeTransaction transaction) {
+    void clearTransaction(final ReadWriteShardDataTreeTransaction transaction) {
         if (transaction.equals(previousTx)) {
             previousTx = null;
         }
     }
         if (transaction.equals(previousTx)) {
             previousTx = null;
         }
     }
+
+    @Override
+    public LocalHistoryIdentifier getIdentifier() {
+        return chainId;
+    }
+
+    @Override
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification modification) {
+        return dataTree.createReadyCohort(txId, modification);
+    }
 }
 }
index 26e17057a741d0cdc4beed33daa777a452181516..20f50156f8fdd8ef9ea16c34393a586982adaca3 100644 (file)
@@ -7,8 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
 abstract class ShardDataTreeTransactionParent {
 abstract class ShardDataTreeTransactionParent {
+
     abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
 
     abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
     abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
 
     abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
+
+    abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
 }
 }
index dfb0897b4850235d11d401b97d6759406a04f6ba..3978d757a1e64b8a01ed4e529c1d90e158040d27 100644 (file)
@@ -24,7 +24,7 @@ public class ShardReadTransaction extends ShardTransaction {
 
     public ShardReadTransaction(AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
             ShardStats shardStats) {
 
     public ShardReadTransaction(AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
             ShardStats shardStats) {
-        super(shardActor, shardStats, transaction.getId());
+        super(shardActor, shardStats, transaction.getIdentifier());
         this.transaction = Preconditions.checkNotNull(transaction);
     }
 
         this.transaction = Preconditions.checkNotNull(transaction);
     }
 
index 9c304ecdad84a0bd4964c1e48707286bc0192a2d..56683073cfc5dab4700a482646c467626fa1f49a 100644 (file)
@@ -33,7 +33,7 @@ public class ShardWriteTransaction extends ShardTransaction {
 
     public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
             ShardStats shardStats) {
 
     public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
             ShardStats shardStats) {
-        super(shardActor, shardStats, transaction.getId());
+        super(shardActor, shardStats, transaction.getIdentifier());
         this.transaction = transaction;
     }
 
         this.transaction = transaction;
     }
 
index 7da174fec583bd1f2ac998383fc8f3677d019712..0527d013f246357ba68def42af0b76146b009741 100644 (file)
@@ -15,7 +15,6 @@ import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -93,34 +92,34 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
         }
     }
 
         }
     }
 
+
     @Override
     @Override
-    public ListenableFuture<Void> abort() {
+    public void abort(final FutureCallback<Void> callback) {
         dataTree.startAbort(this);
         state = State.ABORTED;
 
         final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
         if (!maybeAborts.isPresent()) {
         dataTree.startAbort(this);
         state = State.ABORTED;
 
         final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
         if (!maybeAborts.isPresent()) {
-            return VOID_FUTURE;
+            callback.onSuccess(null);
+            return;
         }
 
         final Future<Iterable<Object>> aborts = maybeAborts.get();
         if (aborts.isCompleted()) {
         }
 
         final Future<Iterable<Object>> aborts = maybeAborts.get();
         if (aborts.isCompleted()) {
-            return VOID_FUTURE;
+            callback.onSuccess(null);
+            return;
         }
 
         }
 
-        final SettableFuture<Void> ret = SettableFuture.create();
         aborts.onComplete(new OnComplete<Iterable<Object>>() {
             @Override
             public void onComplete(final Throwable failure, final Iterable<Object> objs) {
                 if (failure != null) {
         aborts.onComplete(new OnComplete<Iterable<Object>>() {
             @Override
             public void onComplete(final Throwable failure, final Iterable<Object> objs) {
                 if (failure != null) {
-                    ret.setException(failure);
+                    callback.onFailure(failure);
                 } else {
                 } else {
-                    ret.set(null);
+                    callback.onSuccess(null);
                 }
             }
         }, ExecutionContexts.global());
                 }
             }
         }, ExecutionContexts.global());
-
-        return ret;
     }
 
     @Override
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java
new file mode 100644 (file)
index 0000000..1f2eb72
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
+/**
+ * Standalone transaction specialization of {@link AbstractFrontendHistory}. There can be multiple open transactions
+ * and they are submitted in any order.
+ *
+ * @author Robert Varga
+ */
+final class StandaloneFrontendHistory extends AbstractFrontendHistory {
+    private final LocalHistoryIdentifier identifier;
+    private final ShardDataTree tree;
+
+    StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+        super(persistenceId);
+        this.identifier = new LocalHistoryIdentifier(clientId, 0);
+        this.tree = Preconditions.checkNotNull(tree);
+    }
+
+    @Override
+    public LocalHistoryIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException {
+        return FrontendTransaction.createOpen(this, tree.newReadWriteTransaction(id));
+    }
+
+    @Override
+    FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod)
+            throws RequestException {
+        return FrontendTransaction.createReady(this, id, mod);
+    }
+
+    @Override
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
+        return tree.createReadyCohort(id, mod);
+    }
+}
index d0302eb6feae94a6acb525a5a048a67a437fe275..ab7e8f040159471c3290713797d2105a321584a8 100644 (file)
@@ -31,7 +31,6 @@ import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.base.Optional;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
@@ -437,7 +436,7 @@ public abstract class AbstractShardTest extends AbstractActorTest {
         private FutureCallback<DataTreeCandidate> preCommit;
         private FutureCallback<UnsignedLong> commit;
 
         private FutureCallback<DataTreeCandidate> preCommit;
         private FutureCallback<UnsignedLong> commit;
 
-        public void setDelegate(ShardDataTreeCohort delegate) {
+        public void setDelegate(final ShardDataTreeCohort delegate) {
             this.delegate = delegate;
         }
 
             this.delegate = delegate;
         }
 
@@ -472,19 +471,19 @@ public abstract class AbstractShardTest extends AbstractActorTest {
         }
 
         @Override
         }
 
         @Override
-        public void canCommit(FutureCallback<Void> callback) {
+        public void canCommit(final FutureCallback<Void> callback) {
             canCommit = mockFutureCallback(callback);
             delegate.canCommit(canCommit);
         }
 
         @Override
             canCommit = mockFutureCallback(callback);
             delegate.canCommit(canCommit);
         }
 
         @Override
-        public void preCommit(FutureCallback<DataTreeCandidate> callback) {
+        public void preCommit(final FutureCallback<DataTreeCandidate> callback) {
             preCommit = mockFutureCallback(callback);
             delegate.preCommit(preCommit);
         }
 
         @Override
             preCommit = mockFutureCallback(callback);
             delegate.preCommit(preCommit);
         }
 
         @Override
-        public void commit(FutureCallback<UnsignedLong> callback) {
+        public void commit(final FutureCallback<UnsignedLong> callback) {
             commit = mockFutureCallback(callback);
             delegate.commit(commit);
         }
             commit = mockFutureCallback(callback);
             delegate.commit(commit);
         }
@@ -506,8 +505,8 @@ public abstract class AbstractShardTest extends AbstractActorTest {
         }
 
         @Override
         }
 
         @Override
-        public ListenableFuture<Void> abort() {
-            return delegate.abort();
+        public void abort(final FutureCallback<Void> callback) {
+            delegate.abort(callback);
         }
 
         @Override
         }
 
         @Override
index 232d9aa618a0b88909d00d7788dee12df8e5a11f..7d2500b7168249685752f68505697e1662ed5c35 100644 (file)
@@ -19,9 +19,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -210,12 +211,28 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
         verify(mockUserCohorts).abort();
     }
 
         verify(mockUserCohorts).abort();
     }
 
+    private static Future<?> abort(final ShardDataTreeCohort cohort) {
+        final CompletableFuture<Void> f = new CompletableFuture<>();
+        cohort.abort(new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                f.complete(null);
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                f.completeExceptionally(failure);
+            }
+        });
+
+        return f;
+    }
+
     @Test
     public void testAbort() throws Exception {
         doNothing().when(mockShardDataTree).startAbort(cohort);
 
     @Test
     public void testAbort() throws Exception {
         doNothing().when(mockShardDataTree).startAbort(cohort);
 
-        cohort.abort().get();
-
+        abort(cohort).get();
         verify(mockShardDataTree).startAbort(cohort);
     }
 
         verify(mockShardDataTree).startAbort(cohort);
     }
 
@@ -226,7 +243,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest {
         final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
         doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();
 
         final Promise<Iterable<Object>> cohortFuture = akka.dispatch.Futures.promise();
         doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort();
 
-        final ListenableFuture<Void> abortFuture = cohort.abort();
+        final Future<?> abortFuture = abort(cohort);
 
         cohortFuture.success(Collections.emptyList());
 
 
         cohortFuture.success(Collections.emptyList());