BUG-5280: implement transaction dispatch 46/39946/63
authorRobert Varga <rovarga@cisco.com>
Tue, 7 Jun 2016 12:59:14 +0000 (14:59 +0200)
committerTony Tkacik <ttkacik@cisco.com>
Tue, 26 Jul 2016 08:49:16 +0000 (08:49 +0000)
This patch adds the DOMStore interface in DistributedDataStoreClient
and defines the missing messages.

Change-Id: I6b0905fb97e3269c12a5cd8f2c681e4caeb14e3e
Signed-off-by: Robert Varga <rovarga@cisco.com>
52 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalHistoryRequestProxy.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequestProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailure.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequest.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalAbortable.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/VotingFuture.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfoResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java

diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalHistoryRequestProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalHistoryRequestProxy.java
new file mode 100644 (file)
index 0000000..0fbeecc
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Abstract base class for serialization proxies associated with {@link LocalHistoryRequest}s.
+ *
+ * @author Robert Varga
+ *
+ * @param <T> Message type
+ */
+abstract class AbstractLocalHistoryRequestProxy<T extends LocalHistoryRequest<T>> extends AbstractRequestProxy<LocalHistoryIdentifier, T> {
+    private static final long serialVersionUID = 1L;
+
+    AbstractLocalHistoryRequestProxy() {
+        // For Externalizable
+    }
+
+    AbstractLocalHistoryRequestProxy(final T request) {
+        super(request);
+    }
+
+    @Override
+    protected final LocalHistoryIdentifier readTarget(final DataInput in) throws IOException {
+        return LocalHistoryIdentifier.readFrom(in);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java
new file mode 100644 (file)
index 0000000..28a77c4
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Request to create a new local history.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class CreateLocalHistoryRequest extends LocalHistoryRequest<CreateLocalHistoryRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public CreateLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+        super(target, replyTo);
+    }
+
+    private CreateLocalHistoryRequest(final CreateLocalHistoryRequest request, final ABIVersion version) {
+        super(request, version);
+    }
+
+    @Override
+    protected AbstractLocalHistoryRequestProxy<CreateLocalHistoryRequest> externalizableProxy(final ABIVersion version) {
+        return new CreateLocalHistoryRequestProxyV1(this);
+    }
+
+    @Override
+    protected CreateLocalHistoryRequest cloneAsVersion(final ABIVersion version) {
+        return new CreateLocalHistoryRequest(this, version);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequestProxyV1.java
new file mode 100644 (file)
index 0000000..234e477
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link CreateLocalHistoryRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class CreateLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestProxy<CreateLocalHistoryRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public CreateLocalHistoryRequestProxyV1() {
+        // For Externalizable
+    }
+
+    CreateLocalHistoryRequestProxyV1(final CreateLocalHistoryRequest request) {
+        super(request);
+    }
+
+    @Override
+    protected CreateLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+        return new CreateLocalHistoryRequest(target, replyTo);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java
new file mode 100644 (file)
index 0000000..bf86c8b
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received a request to create a history which has already
+ * been retired.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class DeadHistoryException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    public DeadHistoryException(final long lastSeenHistory) {
+        super("Histories up to " + Long.toUnsignedString(lastSeenHistory) + " are accounted for");
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return true;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequest.java
new file mode 100644 (file)
index 0000000..1997ddd
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Request to destroy a local history.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class DestroyLocalHistoryRequest extends LocalHistoryRequest<DestroyLocalHistoryRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+        super(target, replyTo);
+    }
+
+    private DestroyLocalHistoryRequest(final DestroyLocalHistoryRequest request, final ABIVersion version) {
+        super(request, version);
+    }
+
+    @Override
+    protected AbstractLocalHistoryRequestProxy<DestroyLocalHistoryRequest> externalizableProxy(final ABIVersion version) {
+        return new DestroyLocalHistoryRequestProxyV1(this);
+    }
+
+    @Override
+    protected DestroyLocalHistoryRequest cloneAsVersion(final ABIVersion version) {
+        return new DestroyLocalHistoryRequest(this, version);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java
new file mode 100644 (file)
index 0000000..9e953ce
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link DestroyLocalHistoryRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class DestroyLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestProxy<DestroyLocalHistoryRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public DestroyLocalHistoryRequestProxyV1() {
+        // For Externalizable
+    }
+
+    DestroyLocalHistoryRequestProxyV1(final DestroyLocalHistoryRequest request) {
+        super(request);
+    }
+
+    @Override
+    protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+        return new DestroyLocalHistoryRequest(target, replyTo);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailure.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailure.java
new file mode 100644 (file)
index 0000000..795468c
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+
+/**
+ * Generic {@link RequestFailure} involving a {@link LocalHistoryRequest}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class LocalHistoryFailure extends RequestFailure<LocalHistoryIdentifier, LocalHistoryFailure> {
+    private static final long serialVersionUID = 1L;
+
+    LocalHistoryFailure(final LocalHistoryIdentifier target, final RequestException cause) {
+        super(target, cause);
+    }
+
+    @Override
+    protected LocalHistoryFailure cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+
+    @Override
+    protected LocalHistoryFailureProxyV1 externalizableProxy(final ABIVersion version) {
+        return new LocalHistoryFailureProxyV1(this);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java
new file mode 100644 (file)
index 0000000..e8cdf6d
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * Externalizable proxy for use with {@link LocalHistoryFailure}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class LocalHistoryFailureProxyV1 extends AbstractRequestFailureProxy<LocalHistoryIdentifier, LocalHistoryFailure> {
+    private static final long serialVersionUID = 1L;
+
+    public LocalHistoryFailureProxyV1() {
+        // For Externalizable
+    }
+
+    LocalHistoryFailureProxyV1(final LocalHistoryFailure failure) {
+        super(failure);
+    }
+
+    @Override
+    protected LocalHistoryFailure createFailure(final LocalHistoryIdentifier target, final RequestException cause) {
+        return new LocalHistoryFailure(target, cause);
+    }
+
+    @Override
+    protected LocalHistoryIdentifier readTarget(final DataInput in) throws IOException {
+        return LocalHistoryIdentifier.readFrom(in);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java
new file mode 100644 (file)
index 0000000..badb763
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * Abstract base class for {@link Request}s involving specific local history. This class is visible outside of this
+ * package solely for the ability to perform a unified instanceof check.
+ *
+ * @author Robert Varga
+ *
+ * @param <T> Message type
+ */
+@Beta
+public abstract class LocalHistoryRequest<T extends LocalHistoryRequest<T>> extends Request<LocalHistoryIdentifier, T> {
+    private static final long serialVersionUID = 1L;
+
+    LocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+        super(target, replyTo);
+    }
+
+    LocalHistoryRequest(final T request, final ABIVersion version) {
+        super(request, version);
+    }
+
+    @Override
+    public final LocalHistoryFailure toRequestFailure(final RequestException cause) {
+        return new LocalHistoryFailure(getTarget(), cause);
+    }
+
+    @Override
+    protected abstract AbstractLocalHistoryRequestProxy<T> externalizableProxy(final ABIVersion version);
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java
new file mode 100644 (file)
index 0000000..4e588cc
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.AbstractSuccessProxy;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+
+/**
+ * Success class for {@link RequestSuccess}es involving a specific local history.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class LocalHistorySuccess extends RequestSuccess<LocalHistoryIdentifier, LocalHistorySuccess> {
+    private static final long serialVersionUID = 1L;
+
+    public LocalHistorySuccess(final LocalHistoryIdentifier target) {
+        super(target);
+    }
+
+    private LocalHistorySuccess(final LocalHistorySuccess success, final ABIVersion version) {
+        super(success, version);
+    }
+
+    @Override
+    protected LocalHistorySuccess cloneAsVersion(final ABIVersion version) {
+        return new LocalHistorySuccess(this, version);
+    }
+
+    @Override
+    protected AbstractSuccessProxy<LocalHistoryIdentifier, LocalHistorySuccess> externalizableProxy(
+            final ABIVersion version) {
+        return new LocalHistorySuccessProxyV1(this);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java
new file mode 100644 (file)
index 0000000..7806c33
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.AbstractSuccessProxy;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Serialization proxy associated with {@link LocalHistorySuccess}.
+ *
+ * @author Robert Varga
+ */
+final class LocalHistorySuccessProxyV1 extends AbstractSuccessProxy<LocalHistoryIdentifier, LocalHistorySuccess> {
+    private static final long serialVersionUID = 1L;
+
+    LocalHistorySuccessProxyV1() {
+        // For Externalizable
+    }
+
+    LocalHistorySuccessProxyV1(final LocalHistorySuccess success) {
+        super(success);
+    }
+
+    @Override
+    protected final LocalHistoryIdentifier readTarget(final DataInput in) throws IOException {
+        return LocalHistoryIdentifier.readFrom(in);
+    }
+
+    @Override
+    protected LocalHistorySuccess createSuccess(final LocalHistoryIdentifier target) {
+        return new LocalHistorySuccess(target);
+    }
+}
index 6fbc035..57b5050 100644 (file)
@@ -42,7 +42,6 @@ final class ModifyTransactionRequestProxyV1 extends AbstractTransactionRequestPr
         this.protocol = request.getPersistenceProtocol();
     }
 
-
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequest.java
new file mode 100644 (file)
index 0000000..4390d17
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Request to purge a local history. This request is sent by the client once it receives a successful reply to
+ * {@link DestroyLocalHistoryRequest} and indicates it has removed all state attached to a particular local history.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class PurgeLocalHistoryRequest extends LocalHistoryRequest<PurgeLocalHistoryRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+        super(target, replyTo);
+    }
+
+    private PurgeLocalHistoryRequest(final PurgeLocalHistoryRequest request, final ABIVersion version) {
+        super(request, version);
+    }
+
+    @Override
+    protected AbstractLocalHistoryRequestProxy<PurgeLocalHistoryRequest> externalizableProxy(final ABIVersion version) {
+        return new PurgeLocalHistoryRequestProxyV1(this);
+    }
+
+    @Override
+    protected PurgeLocalHistoryRequest cloneAsVersion(final ABIVersion version) {
+        return new PurgeLocalHistoryRequest(this, version);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java
new file mode 100644 (file)
index 0000000..0aaac6e
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link PurgeLocalHistoryRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class PurgeLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestProxy<PurgeLocalHistoryRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public PurgeLocalHistoryRequestProxyV1() {
+        // For Externalizable
+    }
+
+    PurgeLocalHistoryRequestProxyV1(final PurgeLocalHistoryRequest request) {
+        super(request);
+    }
+
+    @Override
+    protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+        return new PurgeLocalHistoryRequest(target, replyTo);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java
new file mode 100644 (file)
index 0000000..a7f132a
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A transaction request to perform the abort step of the three-phase commit protocol.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class TransactionAbortRequest extends TransactionRequest<TransactionAbortRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionAbortRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+        super(target, replyTo);
+    }
+
+    @Override
+    protected TransactionAbortRequestProxyV1 externalizableProxy(final ABIVersion version) {
+        return new TransactionAbortRequestProxyV1(this);
+    }
+
+    @Override
+    protected TransactionAbortRequest cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java
new file mode 100644 (file)
index 0000000..bc1fc58
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionAbortRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionAbortRequestProxyV1 extends AbstractTransactionRequestProxy<TransactionAbortRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionAbortRequestProxyV1() {
+        // For Externalizable
+    }
+
+    TransactionAbortRequestProxyV1(final TransactionAbortRequest request) {
+        super(request);
+    }
+
+    @Override
+    protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+        return new TransactionAbortRequest(target, replyTo);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java
new file mode 100644 (file)
index 0000000..c9625af
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to a coordinated commit request initiated by a {@link ModifyTransactionRequest}
+ * or {@link CommitLocalTransactionRequest}.
+ *
+ * @author Robert Varga
+ */
+public final class TransactionAbortSuccess extends TransactionSuccess<TransactionAbortSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionAbortSuccess(final TransactionIdentifier identifier) {
+        super(identifier);
+    }
+
+    @Override
+    protected AbstractTransactionSuccessProxy<TransactionAbortSuccess> externalizableProxy(final ABIVersion version) {
+        return new TransactionAbortSuccessProxyV1(this);
+    }
+
+    @Override
+    protected TransactionAbortSuccess cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java
new file mode 100644 (file)
index 0000000..2c34737
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionAbortSuccess}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionAbortSuccessProxyV1 extends AbstractTransactionSuccessProxy<TransactionAbortSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionAbortSuccessProxyV1() {
+        // For Externalizable
+    }
+
+    TransactionAbortSuccessProxyV1(final TransactionAbortSuccess success) {
+        super(success);
+    }
+
+    @Override
+    protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target) {
+        return new TransactionAbortSuccess(target);
+    }
+}
index e31d75f..c7d4176 100644 (file)
@@ -7,28 +7,20 @@
  */
 package org.opendaylight.controller.cluster.access.commands;
 
-import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 /**
- * Successful reply to a coordinated commit request. It contains a reference to the actor which is handling the commit
- * process.
+ * Successful reply to a coordinated commit request initiated by a {@link ModifyTransactionRequest}
+ * or {@link CommitLocalTransactionRequest}.
  *
  * @author Robert Varga
  */
 public final class TransactionCanCommitSuccess extends TransactionSuccess<TransactionCanCommitSuccess> {
     private static final long serialVersionUID = 1L;
-    private final ActorRef cohort;
 
-    public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final ActorRef cohort) {
+    public TransactionCanCommitSuccess(final TransactionIdentifier identifier) {
         super(identifier);
-        this.cohort = Preconditions.checkNotNull(cohort);
-    }
-
-    public ActorRef getCohort() {
-        return cohort;
     }
 
     @Override
index a6d54f5..a8af4af 100644 (file)
@@ -7,9 +7,6 @@
  */
 package org.opendaylight.controller.cluster.access.commands;
 
-import akka.actor.ActorRef;
-import akka.serialization.JavaSerializer;
-import akka.serialization.Serialization;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -23,7 +20,6 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
  */
 final class TransactionCanCommitSuccessProxyV1 extends AbstractTransactionSuccessProxy<TransactionCanCommitSuccess> {
     private static final long serialVersionUID = 1L;
-    private ActorRef cohort;
 
     public TransactionCanCommitSuccessProxyV1() {
         // For Externalizable
@@ -31,23 +27,20 @@ final class TransactionCanCommitSuccessProxyV1 extends AbstractTransactionSucces
 
     TransactionCanCommitSuccessProxyV1(final TransactionCanCommitSuccess success) {
         super(success);
-        this.cohort = success.getCohort();
     }
 
     @Override
     public void writeExternal(final ObjectOutput out) throws IOException {
         super.writeExternal(out);
-        out.writeUTF(Serialization.serializedActorPath(cohort));
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
-        cohort = JavaSerializer.currentSystem().value().provider().resolveActorRef(in.readUTF());
     }
 
     @Override
     protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target) {
-        return new TransactionCanCommitSuccess(target, cohort);
+        return new TransactionCanCommitSuccess(target);
     }
 }
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java
new file mode 100644 (file)
index 0000000..275b5cf
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to a coordinated commit request. It contains a reference to the actor which is handling the commit
+ * process.
+ *
+ * @author Robert Varga
+ */
+public final class TransactionCommitSuccess extends TransactionSuccess<TransactionCommitSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionCommitSuccess(final TransactionIdentifier identifier) {
+        super(identifier);
+    }
+
+    @Override
+    protected AbstractTransactionSuccessProxy<TransactionCommitSuccess> externalizableProxy(final ABIVersion version) {
+        return new TransactionCommitSuccessProxyV1(this);
+    }
+
+    @Override
+    protected TransactionCommitSuccess cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java
new file mode 100644 (file)
index 0000000..4628a9d
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionCommitSuccess}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionCommitSuccessProxyV1 extends AbstractTransactionSuccessProxy<TransactionCommitSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionCommitSuccessProxyV1() {
+        // For Externalizable
+    }
+
+    TransactionCommitSuccessProxyV1(final TransactionCommitSuccess success) {
+        super(success);
+    }
+
+    @Override
+    protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target) {
+        return new TransactionCommitSuccess(target);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java
new file mode 100644 (file)
index 0000000..6707aa1
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A transaction request to perform the final, doCommit, step of the three-phase commit protocol.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class TransactionDoCommitRequest extends TransactionRequest<TransactionDoCommitRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionDoCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+        super(target, replyTo);
+    }
+
+    @Override
+    protected TransactionDoCommitRequestProxyV1 externalizableProxy(final ABIVersion version) {
+        return new TransactionDoCommitRequestProxyV1(this);
+    }
+
+    @Override
+    protected TransactionDoCommitRequest cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java
new file mode 100644 (file)
index 0000000..f771844
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionDoCommitRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionDoCommitRequestProxyV1 extends AbstractTransactionRequestProxy<TransactionDoCommitRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionDoCommitRequestProxyV1() {
+        // For Externalizable
+    }
+
+    TransactionDoCommitRequestProxyV1(final TransactionDoCommitRequest request) {
+        super(request);
+    }
+
+    @Override
+    protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+        return new TransactionDoCommitRequest(target, replyTo);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequest.java
new file mode 100644 (file)
index 0000000..2e73f47
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A transaction request to perform the second, preCommit, step of the three-phase commit protocol.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class TransactionPreCommitRequest extends TransactionRequest<TransactionPreCommitRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionPreCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+        super(target, replyTo);
+    }
+
+    @Override
+    protected TransactionPreCommitRequestProxyV1 externalizableProxy(final ABIVersion version) {
+        return new TransactionPreCommitRequestProxyV1(this);
+    }
+
+    @Override
+    protected TransactionPreCommitRequest cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java
new file mode 100644 (file)
index 0000000..dd41e6a
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionPreCommitRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionPreCommitRequestProxyV1 extends AbstractTransactionRequestProxy<TransactionPreCommitRequest> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionPreCommitRequestProxyV1() {
+        // For Externalizable
+    }
+
+    TransactionPreCommitRequestProxyV1(final TransactionPreCommitRequest request) {
+        super(request);
+    }
+
+    @Override
+    protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+        return new TransactionPreCommitRequest(target, replyTo);
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java
new file mode 100644 (file)
index 0000000..8a7da4e
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to a {@link TransactionPreCommitRequest}.
+ *
+ * @author Robert Varga
+ */
+public final class TransactionPreCommitSuccess extends TransactionSuccess<TransactionPreCommitSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionPreCommitSuccess(final TransactionIdentifier identifier) {
+        super(identifier);
+    }
+
+    @Override
+    protected AbstractTransactionSuccessProxy<TransactionPreCommitSuccess> externalizableProxy(final ABIVersion version) {
+        return new TransactionPreCommitSuccessProxyV1(this);
+    }
+
+    @Override
+    protected TransactionPreCommitSuccess cloneAsVersion(final ABIVersion version) {
+        return this;
+    }
+}
diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java
new file mode 100644 (file)
index 0000000..2c0cdea
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionPreCommitSuccess}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionPreCommitSuccessProxyV1 extends AbstractTransactionSuccessProxy<TransactionPreCommitSuccess> {
+    private static final long serialVersionUID = 1L;
+
+    public TransactionPreCommitSuccessProxyV1() {
+        // For Externalizable
+    }
+
+    TransactionPreCommitSuccessProxyV1(final TransactionPreCommitSuccess success) {
+        super(success);
+    }
+
+    @Override
+    protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target) {
+        return new TransactionPreCommitSuccess(target);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java
new file mode 100644 (file)
index 0000000..ae845a7
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+
+/**
+ * Abstract base class for concrete {@link DOMStoreTransaction} implementations. It holds a reference to the associated
+ * {@link ClientTransaction}. This abstraction layer is needed to isolate end users, who interact with
+ * {@link DOMStoreTransaction} from the internal implementation.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractShardedTransaction implements DOMStoreTransaction {
+    private final ClientTransaction tx;
+
+    AbstractShardedTransaction(final ClientTransaction tx) {
+        this.tx = Preconditions.checkNotNull(tx);
+    }
+
+    @Override
+    public final Object getIdentifier() {
+        return tx.getIdentifier();
+    }
+
+    final ClientTransaction transaction() {
+        return tx;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java
new file mode 100644 (file)
index 0000000..cadb61a
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Proxy implementation of {@link DOMStoreReadTransaction}. It routes all requests to the backing
+ * {@link ClientTransaction}. This class is not final to allow further subclassing by
+ * {@link ShardedDOMStoreReadWriteTransaction}.
+ *
+ * @author Robert Varga
+ */
+class ShardedDOMStoreReadTransaction extends AbstractShardedTransaction implements DOMStoreReadTransaction {
+    ShardedDOMStoreReadTransaction(final ClientTransaction tx) {
+        super(tx);
+    }
+
+    @Override
+    public final void close() {
+        transaction().abort();
+    }
+
+    @Override
+    public final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+        return transaction().read(path);
+    }
+
+    @Override
+    public final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+        return transaction().exists(path);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java
new file mode 100644 (file)
index 0000000..7172770
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Proxy implementation of {@link DOMStoreReadWriteTransaction}. It routes all requests to the backing
+ * {@link ClientTransaction}.
+ *
+ * @author Robert Varga
+ */
+final class ShardedDOMStoreReadWriteTransaction extends ShardedDOMStoreReadTransaction implements DOMStoreReadWriteTransaction {
+
+    ShardedDOMStoreReadWriteTransaction(final ClientTransaction tx) {
+        super(tx);
+    }
+
+    @Override
+    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        transaction().write(path, data);
+    }
+
+    @Override
+    public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        transaction().merge(path, data);
+    }
+
+    @Override
+    public void delete(final YangInstanceIdentifier path) {
+        transaction().delete(path);
+    }
+
+    @Override
+    public DOMStoreThreePhaseCommitCohort ready() {
+        return transaction().ready();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java
new file mode 100644 (file)
index 0000000..97ed6ba
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+
+/**
+ * Implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}. It wraps
+ * {@link ClientTransaction} into proxies like {@link ShardedDOMStoreReadTransaction} to provide isolation.
+ *
+ * @author Robert Varga
+ */
+final class ShardedDOMStoreTransactionChain implements DOMStoreTransactionChain {
+    private final ClientLocalHistory history;
+
+    ShardedDOMStoreTransactionChain(final ClientLocalHistory history) {
+        this.history = Preconditions.checkNotNull(history);
+    }
+
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction() {
+        return new ShardedDOMStoreReadTransaction(history.createTransaction());
+    }
+
+    @Override
+    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        return new ShardedDOMStoreReadWriteTransaction(history.createTransaction());
+    }
+
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        return new ShardedDOMStoreWriteTransaction(history.createTransaction());
+    }
+
+    @Override
+    public void close() {
+        history.close();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java
new file mode 100644 (file)
index 0000000..6161620
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Proxy implementation of {@link DOMStoreWriteTransaction}. It routes all requests to the backing
+ * {@link ClientTransaction}.
+ *
+ * @author Robert Varga
+ */
+final class ShardedDOMStoreWriteTransaction extends AbstractShardedTransaction implements DOMStoreWriteTransaction {
+    ShardedDOMStoreWriteTransaction(final ClientTransaction tx) {
+        super(tx);
+    }
+
+    @Override
+    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        transaction().write(path, data);
+    }
+
+    @Override
+    public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        transaction().merge(path, data);
+    }
+
+    @Override
+    public void delete(final YangInstanceIdentifier path) {
+        transaction().delete(path);
+    }
+
+    @Override
+    public DOMStoreThreePhaseCommitCohort ready() {
+        return transaction().ready();
+    }
+
+    @Override
+    public final void close() {
+        transaction().abort();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
new file mode 100644 (file)
index 0000000..f364994
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
+ * and the other for single transactions.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
+    static enum State {
+        IDLE,
+        TX_OPEN,
+        CLOSED,
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
+    private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
+
+    private final Map<Long, LocalHistoryIdentifier> histories = new ConcurrentHashMap<>();
+    private final DistributedDataStoreClientBehavior client;
+    private final LocalHistoryIdentifier identifier;
+
+    private volatile State state = State.IDLE;
+
+    AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
+        this.client = Preconditions.checkNotNull(client);
+        this.identifier = Preconditions.checkNotNull(identifier);
+        Preconditions.checkArgument(identifier.getCookie() == 0);
+    }
+
+    final State state() {
+        return state;
+    }
+
+    final void updateState(final State expected, final State next) {
+        final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
+        Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
+    }
+
+    final LocalHistoryIdentifier getHistoryForCookie(final Long cookie) {
+        LocalHistoryIdentifier ret = histories.get(cookie);
+        if (ret == null) {
+            ret = new LocalHistoryIdentifier(identifier.getClientId(), identifier.getHistoryId(), cookie);
+            final LocalHistoryIdentifier existing = histories.putIfAbsent(cookie, ret);
+            if (existing != null) {
+                ret = existing;
+            }
+        }
+
+        return ret;
+    }
+
+    @Override
+    public final LocalHistoryIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    final DistributedDataStoreClientBehavior getClient() {
+        return client;
+    }
+
+    @Override
+    final void localAbort(final Throwable cause) {
+        LOG.debug("Force-closing history {}", getIdentifier(), cause);
+        state = State.CLOSED;
+    }
+
+    /**
+     * Callback invoked from {@link ClientTransaction} when a transaction has been sub
+     *
+     * @param transaction Transaction handle
+     */
+    void onTransactionReady(final ClientTransaction transaction) {
+        client.transactionComplete(transaction);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
new file mode 100644 (file)
index 0000000..60919c0
--- /dev/null
@@ -0,0 +1,228 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Class translating transaction operations towards a particular backend shard.
+ *
+ * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
+ * transitions coming from interactions with backend are expected to be thread-safe.
+ *
+ * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
+ * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
+    private final DistributedDataStoreClientBehavior client;
+
+    private long sequence;
+    private boolean sealed;
+
+    AbstractProxyTransaction(final DistributedDataStoreClientBehavior client) {
+        this.client = Preconditions.checkNotNull(client);
+    }
+
+    /**
+     * Instantiate a new tracker for a transaction. This method bases its decision on which implementation to use
+     * based on provided {@link ShardBackendInfo}. If no information is present, it will choose the remote
+     * implementation, which is fine, as the queueing logic in ClientActorBehavior will hold on to the requests until
+     * the backend is located.
+     *
+     * @param client Client behavior
+     * @param historyId Local history identifier
+     * @param transactionId Transaction identifier
+     * @param backend Optional backend identifier
+     * @return A new state tracker
+     */
+    static AbstractProxyTransaction create(final DistributedDataStoreClientBehavior client,
+            final LocalHistoryIdentifier historyId, final long transactionId,
+            final java.util.Optional<ShardBackendInfo> backend) {
+
+        final java.util.Optional<DataTree> dataTree = backend.flatMap(t -> t.getDataTree());
+        final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId);
+        if (dataTree.isPresent()) {
+            return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot());
+        } else {
+            return new RemoteProxyTransaction(client, identifier);
+        }
+    }
+
+    final DistributedDataStoreClientBehavior client() {
+        return client;
+    }
+
+    final long nextSequence() {
+        return sequence++;
+    }
+
+    final void delete(final YangInstanceIdentifier path) {
+        checkSealed();
+        doDelete(path);
+    }
+
+    final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkSealed();
+        doMerge(path, data);
+    }
+
+    final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkSealed();
+        doWrite(path, data);
+    }
+
+    final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+        checkSealed();
+        return doExists(path);
+    }
+
+    final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+        checkSealed();
+        return doRead(path);
+    }
+
+    /**
+     * Seal this transaction before it is either
+     */
+    final void seal() {
+        checkSealed();
+        doSeal();
+        sealed = true;
+    }
+
+    private void checkSealed() {
+        Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
+    }
+
+    /**
+     * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
+     * being sent to the backend.
+     */
+    final void abort() {
+        checkSealed();
+        doAbort();
+    }
+
+    /**
+     * Commit this transaction, possibly in a coordinated fashion.
+     *
+     * @param coordinated True if this transaction should be coordinated across multiple participants.
+     * @return Future completion
+     */
+    final ListenableFuture<Boolean> directCommit() {
+        checkSealed();
+
+        final SettableFuture<Boolean> ret = SettableFuture.create();
+        client().sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(false)), t -> {
+            if (t instanceof TransactionCommitSuccess) {
+                ret.set(Boolean.TRUE);
+            } else if (t instanceof RequestFailure) {
+                ret.setException(((RequestFailure<?, ?>) t).getCause());
+            } else {
+                ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
+            }
+        });
+        return ret;
+    }
+
+    void abort(final VotingFuture<Void> ret) {
+        checkSealed();
+
+        client.sendRequest(nextSequence(), new TransactionAbortRequest(getIdentifier(), client().self()), t -> {
+            if (t instanceof TransactionAbortSuccess) {
+                ret.voteYes();
+            } else if (t instanceof RequestFailure) {
+                ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+            } else {
+                ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+            }
+        });
+    }
+
+    void canCommit(final VotingFuture<?> ret) {
+        checkSealed();
+
+        client.sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(true)), t -> {
+            if (t instanceof TransactionCanCommitSuccess) {
+                ret.voteYes();
+            } else if (t instanceof RequestFailure) {
+                ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+            } else {
+                ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+            }
+        });
+    }
+
+    void preCommit(final VotingFuture<?> ret) {
+        checkSealed();
+
+        client.sendRequest(nextSequence(), new TransactionPreCommitRequest(getIdentifier(), client().self()), t-> {
+            if (t instanceof TransactionPreCommitSuccess) {
+                ret.voteYes();
+            } else if (t instanceof RequestFailure) {
+                ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+            } else {
+                ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+            }
+        });
+    }
+
+    void doCommit(final VotingFuture<?> ret) {
+        checkSealed();
+
+        client.sendRequest(nextSequence(), new TransactionDoCommitRequest(getIdentifier(), client().self()), t-> {
+            if (t instanceof TransactionCommitSuccess) {
+                ret.voteYes();
+            } else if (t instanceof RequestFailure) {
+                ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+            } else {
+                ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+            }
+        });
+    }
+
+    abstract void doDelete(final YangInstanceIdentifier path);
+
+    abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+
+    abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+
+    abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
+
+    abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path);
+
+    abstract void doSeal();
+
+    abstract void doAbort();
+
+    abstract TransactionRequest<?> doCommit(boolean coordinated);
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java
new file mode 100644 (file)
index 0000000..51b1700
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+/**
+ * Base class for internal {@link DOMStoreThreePhaseCommitCohort} implementation. It contains utility constants for
+ * wide reuse.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractTransactionCommitCohort implements DOMStoreThreePhaseCommitCohort {
+    static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+    static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+
+}
index aded49b..8f2ee88 100644 (file)
@@ -7,13 +7,12 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import akka.actor.ActorRef;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 /**
  * Client-side view of a local history. This class tracks all state related to a particular history and routes
@@ -26,41 +25,42 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
  * @author Robert Varga
  */
 @Beta
-public final class ClientLocalHistory implements AutoCloseable {
-    private static final AtomicIntegerFieldUpdater<ClientLocalHistory> STATE_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(ClientLocalHistory.class, "state");
-    private static final int IDLE_STATE = 0;
-    private static final int CLOSED_STATE = 1;
+public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
 
-    private final ClientIdentifier clientId;
-    private final long historyId;
-    private final ActorRef backendActor;
-    private final ActorRef clientActor;
+    private static final AtomicLongFieldUpdater<ClientLocalHistory> NEXT_TX_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(ClientLocalHistory.class, "nextTx");
 
-    private volatile int state = IDLE_STATE;
+    // Used via NEXT_TX_UPDATER
+    @SuppressWarnings("unused")
+    private volatile long nextTx = 0;
 
-    ClientLocalHistory(final DistributedDataStoreClientBehavior client, final long historyId,
-            final ActorRef backendActor) {
-        this.clientActor = client.self();
-        this.backendActor = Preconditions.checkNotNull(backendActor);
-        this.clientId = Verify.verifyNotNull(client.getIdentifier());
-        this.historyId = historyId;
+    ClientLocalHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
+        super(client, historyId);
     }
 
-    private void checkNotClosed() {
-        if (state == CLOSED_STATE) {
-            throw new IllegalStateException("Local history " + new LocalHistoryIdentifier(clientId, historyId) + " is closed");
-        }
+    public ClientTransaction createTransaction() {
+        final State local = state();
+        Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
+        updateState(local, State.TX_OPEN);
+
+        return new ClientTransaction(getClient(), this,
+            new TransactionIdentifier(getIdentifier(), NEXT_TX_UPDATER.getAndIncrement(this)));
     }
 
     @Override
     public void close() {
-        if (STATE_UPDATER.compareAndSet(this, IDLE_STATE, CLOSED_STATE)) {
-            // FIXME: signal close to both client actor and backend actor
-        } else if (state != CLOSED_STATE) {
-            throw new IllegalStateException("Cannot close history with an open transaction");
+        final State local = state();
+        if (local != State.CLOSED) {
+            Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this);
+            updateState(local, State.CLOSED);
         }
     }
 
-    // FIXME: add client requests related to a particular local history
+    @Override
+    void onTransactionReady(final ClientTransaction transaction) {
+        final State local = state();
+        Verify.verify(local == State.TX_OPEN, "Local history %s is in unexpected state %s", this, local);
+        updateState(local, State.IDLE);
+        super.onTransactionReady(transaction);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java
new file mode 100644 (file)
index 0000000..f5f545a
--- /dev/null
@@ -0,0 +1,160 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client-side view of a free-standing transaction.
+ *
+ * This interface is used by the world outside of the actor system and in the actor system it is manifested via
+ * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to
+ * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor.
+ *
+ * It is internally composed of multiple {@link RemoteProxyTransaction}s, each responsible for a component shard.
+ *
+ * Implementation is quite a bit complex, and involves cooperation with {@link AbstractClientHistory} for tracking
+ * gaps in transaction identifiers seen by backends.
+ *
+ * These gaps need to be accounted for in the transaction setup message sent to a particular backend, so it can verify
+ * that the requested transaction is in-sequence. This is critical in ensuring that transactions (which are independent
+ * entities from message queueing perspective) do not get reodered -- thus allowing multiple in-flight transactions.
+ *
+ * Alternative would be to force visibility by sending an abort request to all potential backends, but that would mean
+ * that even empty transactions increase load on all shards -- which would be a scalability issue.
+ *
+ * Yet another alternative would be to introduce inter-transaction dependencies to the queueing layer in client actor,
+ * but that would require additional indirection and complexity.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ClientTransaction extends LocalAbortable implements Identifiable<TransactionIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(ClientTransaction.class);
+    private static final AtomicIntegerFieldUpdater<ClientTransaction> STATE_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ClientTransaction.class, "state");
+    private static final int OPEN_STATE = 0;
+    private static final int CLOSED_STATE = 1;
+
+    private final Map<Long, AbstractProxyTransaction> proxies = new HashMap<>();
+    private final TransactionIdentifier transactionId;
+    private final AbstractClientHistory parent;
+
+    private volatile int state = OPEN_STATE;
+
+    ClientTransaction(final DistributedDataStoreClientBehavior client, final AbstractClientHistory parent,
+        final TransactionIdentifier transactionId) {
+        this.transactionId = Preconditions.checkNotNull(transactionId);
+        this.parent = Preconditions.checkNotNull(parent);
+    }
+
+    private void checkNotClosed() {
+        Preconditions.checkState(state == OPEN_STATE, "Transaction %s is closed", transactionId);
+    }
+
+    private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) {
+        checkNotClosed();
+
+        final ModuleShardBackendResolver resolver = parent.getClient().resolver();
+        final Long shard = resolver.resolveShardForPath(path);
+        AbstractProxyTransaction ret = proxies.get(shard);
+        if (ret == null) {
+            ret = AbstractProxyTransaction.create(parent.getClient(), parent.getHistoryForCookie(shard),
+                transactionId.getTransactionId(), resolver.getFutureBackendInfo(shard));
+            proxies.put(shard, ret);
+        }
+        return ret;
+    }
+
+    @Override
+    public TransactionIdentifier getIdentifier() {
+        return transactionId;
+    }
+
+    public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+        return ensureProxy(path).exists(path);
+    }
+
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+        return ensureProxy(path).read(path);
+    }
+
+    public void delete(final YangInstanceIdentifier path) {
+        ensureProxy(path).delete(path);
+    }
+
+    public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        ensureProxy(path).merge(path, data);
+    }
+
+    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        ensureProxy(path).write(path, data);
+    }
+
+    private boolean ensureClosed() {
+        final int local = state;
+        if (local != CLOSED_STATE) {
+            final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE);
+            Preconditions.checkState(success, "Transaction %s raced during close", this);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public DOMStoreThreePhaseCommitCohort ready() {
+        Preconditions.checkState(ensureClosed(), "Attempted to submit a closed transaction %s", this);
+
+        for (AbstractProxyTransaction p : proxies.values()) {
+            p.seal();
+        }
+        parent.onTransactionReady(this);
+
+        switch (proxies.size()) {
+            case 0:
+                return EmptyTransactionCommitCohort.INSTANCE;
+            case 1:
+                return new DirectTransactionCommitCohort(Iterables.getOnlyElement(proxies.values()));
+            default:
+                return new ClientTransactionCommitCohort(proxies.values());
+        }
+    }
+
+    /**
+     * Release all state associated with this transaction.
+     */
+    public void abort() {
+        if (ensureClosed()) {
+            for (AbstractProxyTransaction proxy : proxies.values()) {
+                proxy.abort();
+            }
+            proxies.clear();
+        }
+    }
+
+    @Override
+    void localAbort(final Throwable cause) {
+        LOG.debug("Aborting transaction {}", getIdentifier(), cause);
+        abort();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java
new file mode 100644 (file)
index 0000000..2521c38
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.List;
+
+final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohort {
+    private final List<AbstractProxyTransaction> proxies;
+
+    /**
+     * @param clientTransaction
+     */
+    ClientTransactionCommitCohort(final Collection<AbstractProxyTransaction> proxies) {
+        this.proxies = ImmutableList.copyOf(proxies);
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        /*
+         * Issue the request to commit for all participants. We will track the results and report them.
+         */
+        final VotingFuture<Boolean> ret = new VotingFuture<>(Boolean.TRUE, proxies.size());
+        for (AbstractProxyTransaction proxy : proxies) {
+            proxy.canCommit(ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        final VotingFuture<Void> ret = new VotingFuture<>(null, proxies.size());
+        for (AbstractProxyTransaction proxy : proxies) {
+            proxy.preCommit(ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        final VotingFuture<Void> ret = new VotingFuture<>(null, proxies.size());
+        for (AbstractProxyTransaction proxy : proxies) {
+            proxy.doCommit(ret);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        final VotingFuture<Void> ret = new VotingFuture<>(null, proxies.size());
+        for (AbstractProxyTransaction proxy : proxies) {
+            proxy.abort(ret);
+        }
+
+        return ret;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java
new file mode 100644 (file)
index 0000000..007ac53
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * An {@link AbstractTransactionCommitCohort} implementation for transactions which contain a single proxy. Since there
+ * is only one proxy,
+ *
+ * @author Robert Varga
+ */
+final class DirectTransactionCommitCohort extends AbstractTransactionCommitCohort {
+    private final AbstractProxyTransaction proxy;
+
+    /**
+     * @param clientTransaction
+     */
+    DirectTransactionCommitCohort(final AbstractProxyTransaction proxy) {
+        this.proxy = Preconditions.checkNotNull(proxy);
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        return proxy.directCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        return VOID_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        return VOID_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        return VOID_FUTURE;
+    }
+}
index 82c839e..0d22d56 100644 (file)
@@ -8,7 +8,7 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.annotations.Beta;
-import java.util.concurrent.CompletionStage;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
 
@@ -23,19 +23,22 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 @Beta
 public interface DistributedDataStoreClient extends Identifiable<ClientIdentifier>, AutoCloseable {
     @Override
-    ClientIdentifier getIdentifier();
+    @Nonnull ClientIdentifier getIdentifier();
 
     @Override
     void close();
 
     /**
-     * Create a new local history. This method initiates an asynchronous instantiation of a local history on the back
-     * end. ClientLocalHistory represents the interface exposed to the client.
+     * Create a new local history. ClientLocalHistory represents the interface exposed to the client.
      *
-     * @return Future client history handle
+     * @return Client history handle
      */
-    CompletionStage<ClientLocalHistory> createLocalHistory();
-
-    // TODO: add methods required by DistributedDataStore
+    @Nonnull ClientLocalHistory createLocalHistory();
 
+    /**
+     * Create a new free-standing transaction.
+     *
+     * @return Client transaction handle
+     */
+    @Nonnull ClientTransaction createTransaction();
 }
index 364e462..5aca99a 100644 (file)
@@ -9,9 +9,16 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
+import com.google.common.base.Throwables;
+import com.google.common.base.Verify;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
 import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -47,12 +54,19 @@ import org.slf4j.LoggerFactory;
 final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
 
+    private final Map<TransactionIdentifier, ClientTransaction> transactions = new ConcurrentHashMap<>();
+    private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
+    private final AtomicLong nextHistoryId = new AtomicLong(1);
+    private final AtomicLong nextTransactionId = new AtomicLong();
     private final ModuleShardBackendResolver resolver;
-    private long nextHistoryId;
+    private final SingleClientHistory singleHistory;
+
+    private volatile Throwable aborted;
 
     DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
         super(context);
         resolver = new ModuleShardBackendResolver(actorContext);
+        singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
     }
 
     //
@@ -63,26 +77,37 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
 
     @Override
     protected void haltClient(final Throwable cause) {
-        // FIXME: Add state flushing here once we have state
+        // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up
+        // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
+        // thread.
+        if (aborted != null) {
+            abortOperations(cause);
+        }
     }
 
-    private ClientActorBehavior createLocalHistory(final ClientActorBehavior currentBehavior,
-            final CompletableFuture<ClientLocalHistory> future) {
-        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
-        LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
+    private void abortOperations(final Throwable cause) {
+        // This acts as a barrier, application threads check this after they have added an entry in the maps,
+        // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
+        aborted = cause;
 
-        // FIXME: initiate backend instantiation
-        future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
-        return currentBehavior;
+        for (ClientLocalHistory h : histories.values()) {
+            h.localAbort(cause);
+        }
+        histories.clear();
+
+        for (ClientTransaction t : transactions.values()) {
+            t.localAbort(cause);
+        }
+        transactions.clear();
     }
 
-    private ClientActorBehavior shutdown(final ClientActorBehavior currentBehavior) {
-        // FIXME: Add shutdown procedures here
+    private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
+        abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
         return null;
     }
 
     @Override
-    protected ClientActorBehavior onCommand(final Object command) {
+    protected DistributedDataStoreClientBehavior onCommand(final Object command) {
         if (command instanceof GetClientRequest) {
             ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
         } else {
@@ -98,11 +123,41 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
     //
     //
 
+    private static <K, V extends LocalAbortable> V returnIfOperational(final Map<K , V> map, final K key, final V value,
+            final Throwable aborted) {
+        Verify.verify(map.put(key, value) == null);
+
+        if (aborted != null) {
+            try {
+                value.localAbort(aborted);
+            } catch (Exception e) {
+                LOG.debug("Close of {} failed", value, e);
+            }
+            map.remove(key, value);
+            throw Throwables.propagate(aborted);
+        }
+
+        return value;
+    }
+
     @Override
-    public CompletionStage<ClientLocalHistory> createLocalHistory() {
-        final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
-        context().executeInActor(currentBehavior -> createLocalHistory(currentBehavior, future));
-        return future;
+    public ClientLocalHistory createLocalHistory() {
+        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
+            nextHistoryId.getAndIncrement());
+        final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
+        LOG.debug("{}: creating a new local history {}", persistenceId(), history);
+
+        return returnIfOperational(histories, historyId, history, aborted);
+    }
+
+    @Override
+    public ClientTransaction createTransaction() {
+        final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
+            nextTransactionId.getAndIncrement());
+        final ClientTransaction tx = new ClientTransaction(this, singleHistory, txId);
+        LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
+
+        return returnIfOperational(transactions, txId, tx, aborted);
     }
 
     @Override
@@ -114,4 +169,16 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
     protected ModuleShardBackendResolver resolver() {
         return resolver;
     }
+
+    void transactionComplete(final ClientTransaction transaction) {
+        transactions.remove(transaction.getIdentifier());
+    }
+
+    void sendRequest(final long sequence, final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
+        sendRequest(sequence, request, response -> {
+            completer.accept(response);
+            return this;
+        });
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java
new file mode 100644 (file)
index 0000000..7032660
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+/**
+ * An {@link AbstractTransactionCommitCohort} for use with empty transactions. This relies on the fact that no backends
+ * have been touched, hence all state book-keeping needs to happen only locally and shares fate with the coordinator.
+ *
+ * Therefore all methods can finish immediately without any effects.
+ *
+ * @author Robert Varga
+ */
+final class EmptyTransactionCommitCohort extends AbstractTransactionCommitCohort {
+    static final DOMStoreThreePhaseCommitCohort INSTANCE = new EmptyTransactionCommitCohort();
+
+    private EmptyTransactionCommitCohort() {
+        // Hidden
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        return TRUE_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        return VOID_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        return VOID_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        return VOID_FUTURE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java
new file mode 100644 (file)
index 0000000..b21b46d
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.function.Supplier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+
+/**
+ * An implementation of DataTreeModification which throws a specified {@link RuntimeException} when any of its methods
+ * are invoked.
+ *
+ * @author Robert Varga
+ */
+final class FailedDataTreeModification implements DataTreeModification {
+    private final Supplier<? extends RuntimeException> supplier;
+
+    FailedDataTreeModification(final Supplier<? extends RuntimeException> supplier) {
+        this.supplier = Preconditions.checkNotNull(supplier);
+    }
+
+    @Override
+    public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
+        throw supplier.get();
+    }
+
+    @Override
+    public DataTreeModification newModification() {
+        throw supplier.get();
+    }
+
+    @Override
+    public void delete(final YangInstanceIdentifier path) {
+        throw supplier.get();
+    }
+
+    @Override
+    public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        throw supplier.get();
+    }
+
+    @Override
+    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        throw supplier.get();
+    }
+
+    @Override
+    public void ready() {
+        throw supplier.get();
+    }
+
+    @Override
+    public void applyToCursor(final DataTreeModificationCursor cursor) {
+        throw supplier.get();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalAbortable.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalAbortable.java
new file mode 100644 (file)
index 0000000..5b0708f
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+/**
+ * Common interface for client histories and client transactions, which can be aborted immediately without replicating
+ * the effect to the backend. This is needed for abrupt shutdowns.
+ *
+ * Since classes which need to expose this functionality do not need a base class, this is an abstract class and not
+ * an interface -- which allows us to not leak the {@link #localAbort(Throwable)} method.
+ *
+ * @author Robert Varga
+ */
+abstract class LocalAbortable {
+    /**
+     * Immediately abort this object.
+     *
+     * @param cause Failure which caused this abort.
+     */
+    abstract void localAbort(Throwable cause);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
new file mode 100644 (file)
index 0000000..1dec846
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with
+ * the client instance.
+ *
+ * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations
+ * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard
+ * leader.
+ *
+ * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the
+ * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class LocalProxyTransaction extends AbstractProxyTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
+    private static final Consumer<Response<?, ?>> ABORT_COMPLETER = response -> {
+        LOG.debug("Abort completed with {}", response);
+    };
+
+    private final TransactionIdentifier identifier;
+    private DataTreeModification modification;
+
+    LocalProxyTransaction(final DistributedDataStoreClientBehavior client,
+        final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) {
+        super(client);
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.modification = snapshot.newModification();
+    }
+
+    @Override
+    public TransactionIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    void doDelete(final YangInstanceIdentifier path) {
+        modification.delete(path);
+    }
+
+    @Override
+    void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        modification.merge(path, data);
+    }
+
+    @Override
+    void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        modification.write(path, data);
+    }
+
+    @Override
+    CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
+        return Futures.immediateCheckedFuture(modification.readNode(path).isPresent());
+    }
+
+    @Override
+    CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
+        return Futures.immediateCheckedFuture(modification.readNode(path));
+    }
+
+    @Override
+    void doAbort() {
+        client().sendRequest(nextSequence(), new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER);
+        modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
+    }
+
+    @Override
+    CommitLocalTransactionRequest doCommit(final boolean coordinated) {
+        final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, client().self(),
+            modification, coordinated);
+        modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted"));
+        return ret;
+    }
+
+    @Override
+    void doSeal() {
+        modification.ready();
+    }
+}
index 6f15c72..bef863d 100644 (file)
@@ -7,23 +7,30 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.OnComplete;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableBiMap.Builder;
 import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfo;
 import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.compat.java8.FutureConverters;
+import scala.concurrent.ExecutionContext;
 
 /**
  * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
@@ -33,7 +40,11 @@ import scala.compat.java8.FutureConverters;
  * @author Robert Varga
  */
 final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
+    private static final ExecutionContext DIRECT_EXECUTION_CONTEXT =
+            ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
+    private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
     private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
+
     /**
      * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
      * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
@@ -44,7 +55,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
 
     private final ActorContext actorContext;
 
-    private volatile BiMap<String, Long> shards = ImmutableBiMap.of();
+    @GuardedBy("this")
+    private long nextShard = 1;
+
+    private volatile BiMap<String, Long> shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L);
 
     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
     ModuleShardBackendResolver(final ActorContext actorContext) {
@@ -63,17 +77,49 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
         actorContext.getPrimaryShardInfoCache().remove(((ShardBackendInfo)result).getShardName());
     }
 
+    Long resolveShardForPath(final YangInstanceIdentifier path) {
+        final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
+        Long cookie = shards.get(shardName);
+        if (cookie == null) {
+            synchronized (this) {
+                cookie = shards.get(shardName);
+                if (cookie == null) {
+                    cookie = nextShard++;
+
+                    Builder<String, Long> b = ImmutableBiMap.builder();
+                    b.putAll(shards);
+                    b.put(shardName, cookie);
+                    shards = b.build();
+                }
+            }
+        }
+
+        return cookie;
+    }
+
     @Override
-    protected CompletionStage<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
+    protected CompletableFuture<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
         final String shardName = shards.inverse().get(cookie);
         if (shardName == null) {
             LOG.warn("Failing request for non-existent cookie {}", cookie);
-            return CompletableFuture.completedFuture(null);
+            return NULL_FUTURE;
         }
 
+        final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<>();
+
+        actorContext.findPrimaryShardAsync(shardName).onComplete(new OnComplete<PrimaryShardInfo>() {
+            @Override
+            public void onComplete(final Throwable t, final PrimaryShardInfo v) {
+                if (t != null) {
+                    ret.completeExceptionally(t);
+                } else {
+                    ret.complete(createBackendInfo(v, shardName, cookie));
+                }
+            }
+        }, DIRECT_EXECUTION_CONTEXT);
+
         LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
-        return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName))
-                .thenApply(o -> createBackendInfo(o, shardName, cookie));
+        return ret;
     }
 
     private static ABIVersion toABIVersion(final short version) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
new file mode 100644 (file)
index 0000000..13b7fb4
--- /dev/null
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently
+ * not known or is known to be not co-located with the client.
+ *
+ * It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for
+ * maintaining any submitted operations until the leader is discovered.
+ *
+ * This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
+ * transitions based on backend responses are thread-safe.
+ *
+ * @author Robert Varga
+ */
+final class RemoteProxyTransaction extends AbstractProxyTransaction {
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
+
+    // FIXME: make this tuneable
+    private static final int REQUEST_MAX_MODIFICATIONS = 1000;
+
+    private final ModifyTransactionRequestBuilder builder;
+
+    private boolean builderBusy;
+
+    private volatile Exception operationFailure;
+
+    RemoteProxyTransaction(final DistributedDataStoreClientBehavior client,
+        final TransactionIdentifier identifier) {
+        super(client);
+        builder = new ModifyTransactionRequestBuilder(identifier, client.self());
+    }
+
+    @Override
+    public TransactionIdentifier getIdentifier() {
+        return builder.getIdentifier();
+    }
+
+    @Override
+    void doDelete(final YangInstanceIdentifier path) {
+        appendModification(new TransactionDelete(path));
+    }
+
+    @Override
+    void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        appendModification(new TransactionMerge(path, data));
+    }
+
+    @Override
+    void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        appendModification(new TransactionWrite(path, data));
+    }
+
+    private <T> CheckedFuture<T, ReadFailedException> sendReadRequest(final AbstractReadTransactionRequest<?> request,
+            final Consumer<Response<?, ?>> completer, final ListenableFuture<T> future) {
+        // Check if a previous operation failed. If it has, do not bother sending anything and report a failure
+        final Exception local = operationFailure;
+        if (local != null) {
+            return Futures.immediateFailedCheckedFuture(new ReadFailedException("Previous operation failed", local));
+        }
+
+        // Make sure we send any modifications before issuing a read
+        ensureFlushedBuider();
+        client().sendRequest(nextSequence(), request, completer);
+        return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
+    }
+
+    @Override
+    CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
+        final SettableFuture<Boolean> future = SettableFuture.create();
+        return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), client().self(), path),
+            t -> completeExists(future, t), future);
+    }
+
+    @Override
+    CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
+        return sendReadRequest(new ReadTransactionRequest(getIdentifier(), client().self(), path),
+            t -> completeRead(future, t), future);
+    }
+
+    @Override
+    void doAbort() {
+        ensureInitializedBuider();
+        builder.setAbort();
+        flushBuilder();
+    }
+
+    private void ensureInitializedBuider() {
+        if (!builderBusy) {
+            builderBusy = true;
+        }
+    }
+
+    private void ensureFlushedBuider() {
+        if (builderBusy) {
+            flushBuilder();
+        }
+    }
+
+    private void flushBuilder() {
+        client().sendRequest(nextSequence(), builder.build(), this::completeModify);
+        builderBusy = false;
+    }
+
+    private void appendModification(final TransactionModification modification) {
+        if (operationFailure == null) {
+            ensureInitializedBuider();
+
+            builder.addModification(modification);
+            if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
+                flushBuilder();
+            }
+        } else {
+            LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
+        }
+    }
+
+    private void completeModify(final Response<?, ?> response) {
+        LOG.debug("Modification request completed with {}", response);
+
+        if (response instanceof TransactionSuccess) {
+            // Happy path no-op
+        } else {
+            recordFailedResponse(response);
+        }
+    }
+
+    private Exception recordFailedResponse(final Response<?, ?> response) {
+        final Exception failure;
+        if (response instanceof RequestFailure) {
+            failure = ((RequestFailure<?, ?>) response).getCause();
+        } else {
+            LOG.warn("Unhandled response {}", response);
+            failure = new IllegalArgumentException("Unhandled response " + response.getClass());
+        }
+
+        if (operationFailure == null) {
+            LOG.debug("Transaction {} failed", getIdentifier(), failure);
+            operationFailure = failure;
+        }
+        return failure;
+    }
+
+    private void failFuture(final SettableFuture<?> future, final Response<?, ?> response) {
+        future.setException(recordFailedResponse(response));
+    }
+
+    private void completeExists(final SettableFuture<Boolean> future, final Response<?, ?> response) {
+        LOG.debug("Exists request completed with {}", response);
+
+        if (response instanceof ExistsTransactionSuccess) {
+            future.set(((ExistsTransactionSuccess) response).getExists());
+        } else {
+            failFuture(future, response);
+        }
+    }
+
+    private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future, final Response<?, ?> response) {
+        LOG.debug("Read request completed with {}", response);
+
+        if (response instanceof ReadTransactionSuccess) {
+            future.set(((ReadTransactionSuccess) response).getData());
+        } else {
+            failFuture(future, response);
+        }
+    }
+
+    @Override
+    ModifyTransactionRequest doCommit(final boolean coordinated) {
+        ensureInitializedBuider();
+        builder.setCommit(coordinated);
+
+        final ModifyTransactionRequest ret = builder.build();
+        builderBusy = false;
+        return ret;
+    }
+
+    @Override
+    void doSeal() {
+        // No-op
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java
new file mode 100644 (file)
index 0000000..b57e9b6
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * An {@link AbstractClientHistory} which handles free-standing transactions.
+ *
+ * @author Robert Varga
+ */
+final class SingleClientHistory extends AbstractClientHistory {
+    protected SingleClientHistory(final DistributedDataStoreClientBehavior client,
+            final LocalHistoryIdentifier identifier) {
+        super(client, identifier);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/VotingFuture.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/VotingFuture.java
new file mode 100644 (file)
index 0000000..b6aba31
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.AbstractFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * An {@link AbstractFuture} implementation which requires a certain number of votes before it completes. If all votes
+ * are 'yes', then it completes with a pre-determined value. If any of the votes are 'no', the future completes with
+ * an exception. This exception corresponds to the cause reported by the first 'no' vote, with all subsequent votes
+ * added as suppressed exceptions.
+ *
+ * Implementation is geared toward positive votes. Negative votes have to synchronize and therefore are more likely
+ * to see contention.
+ *
+ * @author Robert Varga
+ *
+ * @param <T> Type of value returned on success
+ */
+class VotingFuture<T> extends AbstractFuture<T> {
+    @SuppressWarnings("rawtypes")
+    private static final AtomicIntegerFieldUpdater<VotingFuture> VOTES_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(VotingFuture.class, "neededVotes");
+
+    private final T result;
+
+    @GuardedBy("failures")
+    private final Collection<Throwable> failures = new ArrayList<>(0);
+    @SuppressWarnings("unused")
+    private volatile int neededVotes;
+
+    VotingFuture(final T result, final int requiredVotes) {
+        Preconditions.checkArgument(requiredVotes > 0);
+        this.neededVotes = requiredVotes;
+
+        // null is okay to allow Void type
+        this.result = result;
+    }
+
+    void voteYes() {
+        if (castVote()) {
+            synchronized (failures) {
+                resolveResult();
+             }
+        }
+    }
+
+    void voteNo(final Throwable cause) {
+        synchronized (failures) {
+            failures.add(cause);
+            if (castVote()) {
+                resolveResult();
+            }
+        }
+    }
+
+    private boolean castVote() {
+        final int votes = VOTES_UPDATER.decrementAndGet(this);
+        Verify.verify(votes >= 0);
+        return votes == 0;
+    }
+
+    @GuardedBy("failures")
+    private void resolveResult() {
+        final Iterator<Throwable> it = failures.iterator();
+        if (!it.hasNext()) {
+            set(result);
+            return;
+        }
+
+        final Throwable t = it.next();
+        while (it.hasNext()) {
+            t.addSuppressed(it.next());
+        }
+
+        setException(t);
+    }
+}
index 2868998..b2fbc6b 100644 (file)
@@ -9,9 +9,13 @@ package org.opendaylight.controller.cluster.datastore.actors.client;
 
 import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import org.slf4j.Logger;
@@ -28,12 +32,26 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public abstract class BackendInfoResolver<T extends BackendInfo> {
     private static final Logger LOG = LoggerFactory.getLogger(BackendInfoResolver.class);
-    private final ConcurrentMap<Long, CompletionStage<T>> backends = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, CompletableFuture<T>> backends = new ConcurrentHashMap<>();
 
-    // This is what the client needs to start processing. For as long as we do not have this, we should not complete
-    // this stage until we have this information
-    public final CompletionStage<? extends T> getBackendInfo(final long cookie) {
-        return backends.computeIfAbsent(cookie, this::resolveBackendInfo);
+    /**
+     * Return the currently-resolved backend information, if available. This method is guaranteed not to block, but will
+     * initiate resolution of the information if there is none.
+     *
+     * @param cookie Backend cookie
+     * @return Backend information, if available
+     */
+    public final Optional<T> getFutureBackendInfo(final Long cookie) {
+        final Future<T> f = lookupBackend(cookie);
+        if (f.isDone()) {
+            try {
+                return Optional.of(f.get());
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.debug("Resolution of {} failed", f, e);
+            }
+        }
+
+        return Optional.empty();
     }
 
     /**
@@ -55,9 +73,9 @@ public abstract class BackendInfoResolver<T extends BackendInfo> {
      * requests information which is not currently cached.
      *
      * @param cookie Backend cookie
-     * @return A {@link CompletionStage} resulting in information about the backend
+     * @return A {@link CompletableFuture} resulting in information about the backend
      */
-    protected abstract @Nonnull CompletionStage<T> resolveBackendInfo(final @Nonnull Long cookie);
+    protected abstract @Nonnull CompletableFuture<T> resolveBackendInfo(final @Nonnull Long cookie);
 
     /**
      * Invalidate previously-resolved shard information. This method is invoked when a timeout is detected
@@ -66,4 +84,14 @@ public abstract class BackendInfoResolver<T extends BackendInfo> {
      * @param info Previous promise of backend information
      */
     protected abstract void invalidateBackendInfo(@Nonnull CompletionStage<? extends BackendInfo> info);
+
+    // This is what the client needs to start processing. For as long as we do not have this, we should not complete
+    // this stage until we have this information
+    final CompletionStage<? extends T> getBackendInfo(final Long cookie) {
+        return lookupBackend(cookie);
+    }
+
+    private CompletableFuture<T> lookupBackend(final Long cookie) {
+        return backends.computeIfAbsent(Preconditions.checkNotNull(cookie), this::resolveBackendInfo);
+    }
 }