From 98d1c5606bad9633ce5549bcd691a98c75abdf6a Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 Jun 2016 14:59:14 +0200 Subject: [PATCH] BUG-5280: implement transaction dispatch This patch adds the DOMStore interface in DistributedDataStoreClient and defines the missing messages. Change-Id: I6b0905fb97e3269c12a5cd8f2c681e4caeb14e3e Signed-off-by: Robert Varga --- .../AbstractLocalHistoryRequestProxy.java | 37 +++ .../commands/CreateLocalHistoryRequest.java | 41 ++++ .../CreateLocalHistoryRequestProxyV1.java | 34 +++ .../access/commands/DeadHistoryException.java | 31 +++ .../commands/DestroyLocalHistoryRequest.java | 41 ++++ .../DestroyLocalHistoryRequestProxyV1.java | 34 +++ .../access/commands/LocalHistoryFailure.java | 38 +++ .../commands/LocalHistoryFailureProxyV1.java | 42 ++++ .../access/commands/LocalHistoryRequest.java | 44 ++++ .../access/commands/LocalHistorySuccess.java | 43 ++++ .../commands/LocalHistorySuccessProxyV1.java | 40 +++ .../ModifyTransactionRequestProxyV1.java | 1 - .../commands/PurgeLocalHistoryRequest.java | 42 ++++ .../PurgeLocalHistoryRequestProxyV1.java | 34 +++ .../commands/TransactionAbortRequest.java | 37 +++ .../TransactionAbortRequestProxyV1.java | 34 +++ .../commands/TransactionAbortSuccess.java | 35 +++ .../TransactionAbortSuccessProxyV1.java | 33 +++ .../commands/TransactionCanCommitSuccess.java | 14 +- .../TransactionCanCommitSuccessProxyV1.java | 9 +- .../commands/TransactionCommitSuccess.java | 35 +++ .../TransactionCommitSuccessProxyV1.java | 33 +++ .../commands/TransactionDoCommitRequest.java | 37 +++ .../TransactionDoCommitRequestProxyV1.java | 34 +++ .../commands/TransactionPreCommitRequest.java | 37 +++ .../TransactionPreCommitRequestProxyV1.java | 34 +++ .../commands/TransactionPreCommitSuccess.java | 34 +++ .../TransactionPreCommitSuccessProxyV1.java | 33 +++ .../AbstractShardedTransaction.java | 36 +++ .../ShardedDOMStoreReadTransaction.java | 44 ++++ .../ShardedDOMStoreReadWriteTransaction.java | 47 ++++ .../ShardedDOMStoreTransactionChain.java | 50 ++++ .../ShardedDOMStoreWriteTransaction.java | 51 ++++ .../actors/dds/AbstractClientHistory.java | 93 +++++++ .../actors/dds/AbstractProxyTransaction.java | 228 ++++++++++++++++++ .../dds/AbstractTransactionCommitCohort.java | 24 ++ .../actors/dds/ClientLocalHistory.java | 56 ++--- .../actors/dds/ClientTransaction.java | 160 ++++++++++++ .../dds/ClientTransactionCommitCohort.java | 67 +++++ .../dds/DirectTransactionCommitCohort.java | 48 ++++ .../dds/DistributedDataStoreClient.java | 19 +- .../DistributedDataStoreClientBehavior.java | 103 ++++++-- .../dds/EmptyTransactionCommitCohort.java | 47 ++++ .../dds/FailedDataTreeModification.java | 65 +++++ .../databroker/actors/dds/LocalAbortable.java | 26 ++ .../actors/dds/LocalProxyTransaction.java | 106 ++++++++ .../dds/ModuleShardBackendResolver.java | 58 ++++- .../actors/dds/RemoteProxyTransaction.java | 217 +++++++++++++++++ .../actors/dds/SingleClientHistory.java | 22 ++ .../databroker/actors/dds/VotingFuture.java | 90 +++++++ .../actors/client/BackendInfoResolver.java | 42 +++- .../actors/client/ClientActorContext.java | 1 - 52 files changed, 2553 insertions(+), 88 deletions(-) create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalHistoryRequestProxy.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequestProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequest.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailure.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequest.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequest.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalAbortable.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/VotingFuture.java diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalHistoryRequestProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalHistoryRequestProxy.java new file mode 100644 index 0000000000..0fbeecc023 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractLocalHistoryRequestProxy.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import java.io.DataInput; +import java.io.IOException; +import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; + +/** + * Abstract base class for serialization proxies associated with {@link LocalHistoryRequest}s. + * + * @author Robert Varga + * + * @param Message type + */ +abstract class AbstractLocalHistoryRequestProxy> extends AbstractRequestProxy { + private static final long serialVersionUID = 1L; + + AbstractLocalHistoryRequestProxy() { + // For Externalizable + } + + AbstractLocalHistoryRequestProxy(final T request) { + super(request); + } + + @Override + protected final LocalHistoryIdentifier readTarget(final DataInput in) throws IOException { + return LocalHistoryIdentifier.readFrom(in); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java new file mode 100644 index 0000000000..28a77c4426 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequest.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; + +/** + * Request to create a new local history. + * + * @author Robert Varga + */ +@Beta +public final class CreateLocalHistoryRequest extends LocalHistoryRequest { + private static final long serialVersionUID = 1L; + + public CreateLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { + super(target, replyTo); + } + + private CreateLocalHistoryRequest(final CreateLocalHistoryRequest request, final ABIVersion version) { + super(request, version); + } + + @Override + protected AbstractLocalHistoryRequestProxy externalizableProxy(final ABIVersion version) { + return new CreateLocalHistoryRequestProxyV1(this); + } + + @Override + protected CreateLocalHistoryRequest cloneAsVersion(final ABIVersion version) { + return new CreateLocalHistoryRequest(this, version); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequestProxyV1.java new file mode 100644 index 0000000000..234e477ee1 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CreateLocalHistoryRequestProxyV1.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; + +/** + * Externalizable proxy for use with {@link CreateLocalHistoryRequest}. It implements the initial (Boron) serialization + * format. + * + * @author Robert Varga + */ +final class CreateLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestProxy { + private static final long serialVersionUID = 1L; + + public CreateLocalHistoryRequestProxyV1() { + // For Externalizable + } + + CreateLocalHistoryRequestProxyV1(final CreateLocalHistoryRequest request) { + super(request); + } + + @Override + protected CreateLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { + return new CreateLocalHistoryRequest(target, replyTo); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java new file mode 100644 index 0000000000..bf86c8b996 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadHistoryException.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.concepts.RequestException; + +/** + * A {@link RequestException} indicating that the backend has received a request to create a history which has already + * been retired. + * + * @author Robert Varga + */ +@Beta +public final class DeadHistoryException extends RequestException { + private static final long serialVersionUID = 1L; + + public DeadHistoryException(final long lastSeenHistory) { + super("Histories up to " + Long.toUnsignedString(lastSeenHistory) + " are accounted for"); + } + + @Override + public boolean isRetriable() { + return true; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequest.java new file mode 100644 index 0000000000..1997ddd74a --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequest.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; + +/** + * Request to destroy a local history. + * + * @author Robert Varga + */ +@Beta +public final class DestroyLocalHistoryRequest extends LocalHistoryRequest { + private static final long serialVersionUID = 1L; + + public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { + super(target, replyTo); + } + + private DestroyLocalHistoryRequest(final DestroyLocalHistoryRequest request, final ABIVersion version) { + super(request, version); + } + + @Override + protected AbstractLocalHistoryRequestProxy externalizableProxy(final ABIVersion version) { + return new DestroyLocalHistoryRequestProxyV1(this); + } + + @Override + protected DestroyLocalHistoryRequest cloneAsVersion(final ABIVersion version) { + return new DestroyLocalHistoryRequest(this, version); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java new file mode 100644 index 0000000000..9e953ceb21 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DestroyLocalHistoryRequestProxyV1.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; + +/** + * Externalizable proxy for use with {@link DestroyLocalHistoryRequest}. It implements the initial (Boron) serialization + * format. + * + * @author Robert Varga + */ +final class DestroyLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestProxy { + private static final long serialVersionUID = 1L; + + public DestroyLocalHistoryRequestProxyV1() { + // For Externalizable + } + + DestroyLocalHistoryRequestProxyV1(final DestroyLocalHistoryRequest request) { + super(request); + } + + @Override + protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { + return new DestroyLocalHistoryRequest(target, replyTo); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailure.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailure.java new file mode 100644 index 0000000000..795468c03e --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailure.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.RequestFailure; + +/** + * Generic {@link RequestFailure} involving a {@link LocalHistoryRequest}. + * + * @author Robert Varga + */ +@Beta +public final class LocalHistoryFailure extends RequestFailure { + private static final long serialVersionUID = 1L; + + LocalHistoryFailure(final LocalHistoryIdentifier target, final RequestException cause) { + super(target, cause); + } + + @Override + protected LocalHistoryFailure cloneAsVersion(final ABIVersion version) { + return this; + } + + @Override + protected LocalHistoryFailureProxyV1 externalizableProxy(final ABIVersion version) { + return new LocalHistoryFailureProxyV1(this); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java new file mode 100644 index 0000000000..e8cdf6d19f --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryFailureProxyV1.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import java.io.DataInput; +import java.io.IOException; +import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestException; + +/** + * Externalizable proxy for use with {@link LocalHistoryFailure}. It implements the initial (Boron) serialization + * format. + * + * @author Robert Varga + */ +final class LocalHistoryFailureProxyV1 extends AbstractRequestFailureProxy { + private static final long serialVersionUID = 1L; + + public LocalHistoryFailureProxyV1() { + // For Externalizable + } + + LocalHistoryFailureProxyV1(final LocalHistoryFailure failure) { + super(failure); + } + + @Override + protected LocalHistoryFailure createFailure(final LocalHistoryIdentifier target, final RequestException cause) { + return new LocalHistoryFailure(target, cause); + } + + @Override + protected LocalHistoryIdentifier readTarget(final DataInput in) throws IOException { + return LocalHistoryIdentifier.readFrom(in); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java new file mode 100644 index 0000000000..badb763629 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestException; + +/** + * Abstract base class for {@link Request}s involving specific local history. This class is visible outside of this + * package solely for the ability to perform a unified instanceof check. + * + * @author Robert Varga + * + * @param Message type + */ +@Beta +public abstract class LocalHistoryRequest> extends Request { + private static final long serialVersionUID = 1L; + + LocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { + super(target, replyTo); + } + + LocalHistoryRequest(final T request, final ABIVersion version) { + super(request, version); + } + + @Override + public final LocalHistoryFailure toRequestFailure(final RequestException cause) { + return new LocalHistoryFailure(getTarget(), cause); + } + + @Override + protected abstract AbstractLocalHistoryRequestProxy externalizableProxy(final ABIVersion version); +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java new file mode 100644 index 0000000000..4e588cc200 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccess.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.AbstractSuccessProxy; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; + +/** + * Success class for {@link RequestSuccess}es involving a specific local history. + * + * @author Robert Varga + */ +@Beta +public final class LocalHistorySuccess extends RequestSuccess { + private static final long serialVersionUID = 1L; + + public LocalHistorySuccess(final LocalHistoryIdentifier target) { + super(target); + } + + private LocalHistorySuccess(final LocalHistorySuccess success, final ABIVersion version) { + super(success, version); + } + + @Override + protected LocalHistorySuccess cloneAsVersion(final ABIVersion version) { + return new LocalHistorySuccess(this, version); + } + + @Override + protected AbstractSuccessProxy externalizableProxy( + final ABIVersion version) { + return new LocalHistorySuccessProxyV1(this); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java new file mode 100644 index 0000000000..7806c33354 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistorySuccessProxyV1.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import java.io.DataInput; +import java.io.IOException; +import org.opendaylight.controller.cluster.access.concepts.AbstractSuccessProxy; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; + +/** + * Serialization proxy associated with {@link LocalHistorySuccess}. + * + * @author Robert Varga + */ +final class LocalHistorySuccessProxyV1 extends AbstractSuccessProxy { + private static final long serialVersionUID = 1L; + + LocalHistorySuccessProxyV1() { + // For Externalizable + } + + LocalHistorySuccessProxyV1(final LocalHistorySuccess success) { + super(success); + } + + @Override + protected final LocalHistoryIdentifier readTarget(final DataInput in) throws IOException { + return LocalHistoryIdentifier.readFrom(in); + } + + @Override + protected LocalHistorySuccess createSuccess(final LocalHistoryIdentifier target) { + return new LocalHistorySuccess(target); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java index 6fbc035036..57b5050576 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java @@ -42,7 +42,6 @@ final class ModifyTransactionRequestProxyV1 extends AbstractTransactionRequestPr this.protocol = request.getPersistenceProtocol(); } - @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequest.java new file mode 100644 index 0000000000..4390d17e5d --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequest.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; + +/** + * Request to purge a local history. This request is sent by the client once it receives a successful reply to + * {@link DestroyLocalHistoryRequest} and indicates it has removed all state attached to a particular local history. + * + * @author Robert Varga + */ +@Beta +public final class PurgeLocalHistoryRequest extends LocalHistoryRequest { + private static final long serialVersionUID = 1L; + + public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { + super(target, replyTo); + } + + private PurgeLocalHistoryRequest(final PurgeLocalHistoryRequest request, final ABIVersion version) { + super(request, version); + } + + @Override + protected AbstractLocalHistoryRequestProxy externalizableProxy(final ABIVersion version) { + return new PurgeLocalHistoryRequestProxyV1(this); + } + + @Override + protected PurgeLocalHistoryRequest cloneAsVersion(final ABIVersion version) { + return new PurgeLocalHistoryRequest(this, version); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java new file mode 100644 index 0000000000..0aaac6e32a --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PurgeLocalHistoryRequestProxyV1.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; + +/** + * Externalizable proxy for use with {@link PurgeLocalHistoryRequest}. It implements the initial (Boron) serialization + * format. + * + * @author Robert Varga + */ +final class PurgeLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestProxy { + private static final long serialVersionUID = 1L; + + public PurgeLocalHistoryRequestProxyV1() { + // For Externalizable + } + + PurgeLocalHistoryRequestProxyV1(final PurgeLocalHistoryRequest request) { + super(request); + } + + @Override + protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) { + return new PurgeLocalHistoryRequest(target, replyTo); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java new file mode 100644 index 0000000000..a7f132a609 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequest.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * A transaction request to perform the abort step of the three-phase commit protocol. + * + * @author Robert Varga + */ +@Beta +public final class TransactionAbortRequest extends TransactionRequest { + private static final long serialVersionUID = 1L; + + public TransactionAbortRequest(final TransactionIdentifier target, final ActorRef replyTo) { + super(target, replyTo); + } + + @Override + protected TransactionAbortRequestProxyV1 externalizableProxy(final ABIVersion version) { + return new TransactionAbortRequestProxyV1(this); + } + + @Override + protected TransactionAbortRequest cloneAsVersion(final ABIVersion version) { + return this; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java new file mode 100644 index 0000000000..bc1fc582f4 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortRequestProxyV1.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Externalizable proxy for use with {@link TransactionAbortRequest}. It implements the initial (Boron) serialization + * format. + * + * @author Robert Varga + */ +final class TransactionAbortRequestProxyV1 extends AbstractTransactionRequestProxy { + private static final long serialVersionUID = 1L; + + public TransactionAbortRequestProxyV1() { + // For Externalizable + } + + TransactionAbortRequestProxyV1(final TransactionAbortRequest request) { + super(request); + } + + @Override + protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) { + return new TransactionAbortRequest(target, replyTo); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java new file mode 100644 index 0000000000..c9625afa68 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccess.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Successful reply to a coordinated commit request initiated by a {@link ModifyTransactionRequest} + * or {@link CommitLocalTransactionRequest}. + * + * @author Robert Varga + */ +public final class TransactionAbortSuccess extends TransactionSuccess { + private static final long serialVersionUID = 1L; + + public TransactionAbortSuccess(final TransactionIdentifier identifier) { + super(identifier); + } + + @Override + protected AbstractTransactionSuccessProxy externalizableProxy(final ABIVersion version) { + return new TransactionAbortSuccessProxyV1(this); + } + + @Override + protected TransactionAbortSuccess cloneAsVersion(final ABIVersion version) { + return this; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java new file mode 100644 index 0000000000..2c347371e3 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionAbortSuccessProxyV1.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Externalizable proxy for use with {@link TransactionAbortSuccess}. It implements the initial (Boron) + * serialization format. + * + * @author Robert Varga + */ +final class TransactionAbortSuccessProxyV1 extends AbstractTransactionSuccessProxy { + private static final long serialVersionUID = 1L; + + public TransactionAbortSuccessProxyV1() { + // For Externalizable + } + + TransactionAbortSuccessProxyV1(final TransactionAbortSuccess success) { + super(success); + } + + @Override + protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target) { + return new TransactionAbortSuccess(target); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java index e31d75f077..c7d417626a 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccess.java @@ -7,28 +7,20 @@ */ package org.opendaylight.controller.cluster.access.commands; -import akka.actor.ActorRef; -import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; /** - * Successful reply to a coordinated commit request. It contains a reference to the actor which is handling the commit - * process. + * Successful reply to a coordinated commit request initiated by a {@link ModifyTransactionRequest} + * or {@link CommitLocalTransactionRequest}. * * @author Robert Varga */ public final class TransactionCanCommitSuccess extends TransactionSuccess { private static final long serialVersionUID = 1L; - private final ActorRef cohort; - public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final ActorRef cohort) { + public TransactionCanCommitSuccess(final TransactionIdentifier identifier) { super(identifier); - this.cohort = Preconditions.checkNotNull(cohort); - } - - public ActorRef getCohort() { - return cohort; } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java index a6d54f5036..a8af4af2c1 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCanCommitSuccessProxyV1.java @@ -7,9 +7,6 @@ */ package org.opendaylight.controller.cluster.access.commands; -import akka.actor.ActorRef; -import akka.serialization.JavaSerializer; -import akka.serialization.Serialization; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; @@ -23,7 +20,6 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier */ final class TransactionCanCommitSuccessProxyV1 extends AbstractTransactionSuccessProxy { private static final long serialVersionUID = 1L; - private ActorRef cohort; public TransactionCanCommitSuccessProxyV1() { // For Externalizable @@ -31,23 +27,20 @@ final class TransactionCanCommitSuccessProxyV1 extends AbstractTransactionSucces TransactionCanCommitSuccessProxyV1(final TransactionCanCommitSuccess success) { super(success); - this.cohort = success.getCohort(); } @Override public void writeExternal(final ObjectOutput out) throws IOException { super.writeExternal(out); - out.writeUTF(Serialization.serializedActorPath(cohort)); } @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); - cohort = JavaSerializer.currentSystem().value().provider().resolveActorRef(in.readUTF()); } @Override protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target) { - return new TransactionCanCommitSuccess(target, cohort); + return new TransactionCanCommitSuccess(target); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java new file mode 100644 index 0000000000..275b5cf1e8 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccess.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Successful reply to a coordinated commit request. It contains a reference to the actor which is handling the commit + * process. + * + * @author Robert Varga + */ +public final class TransactionCommitSuccess extends TransactionSuccess { + private static final long serialVersionUID = 1L; + + public TransactionCommitSuccess(final TransactionIdentifier identifier) { + super(identifier); + } + + @Override + protected AbstractTransactionSuccessProxy externalizableProxy(final ABIVersion version) { + return new TransactionCommitSuccessProxyV1(this); + } + + @Override + protected TransactionCommitSuccess cloneAsVersion(final ABIVersion version) { + return this; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java new file mode 100644 index 0000000000..4628a9d7b6 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionCommitSuccessProxyV1.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Externalizable proxy for use with {@link TransactionCommitSuccess}. It implements the initial (Boron) + * serialization format. + * + * @author Robert Varga + */ +final class TransactionCommitSuccessProxyV1 extends AbstractTransactionSuccessProxy { + private static final long serialVersionUID = 1L; + + public TransactionCommitSuccessProxyV1() { + // For Externalizable + } + + TransactionCommitSuccessProxyV1(final TransactionCommitSuccess success) { + super(success); + } + + @Override + protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target) { + return new TransactionCommitSuccess(target); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java new file mode 100644 index 0000000000..6707aa199d --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequest.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * A transaction request to perform the final, doCommit, step of the three-phase commit protocol. + * + * @author Robert Varga + */ +@Beta +public final class TransactionDoCommitRequest extends TransactionRequest { + private static final long serialVersionUID = 1L; + + public TransactionDoCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) { + super(target, replyTo); + } + + @Override + protected TransactionDoCommitRequestProxyV1 externalizableProxy(final ABIVersion version) { + return new TransactionDoCommitRequestProxyV1(this); + } + + @Override + protected TransactionDoCommitRequest cloneAsVersion(final ABIVersion version) { + return this; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java new file mode 100644 index 0000000000..f7718446dc --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionDoCommitRequestProxyV1.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Externalizable proxy for use with {@link TransactionDoCommitRequest}. It implements the initial (Boron) serialization + * format. + * + * @author Robert Varga + */ +final class TransactionDoCommitRequestProxyV1 extends AbstractTransactionRequestProxy { + private static final long serialVersionUID = 1L; + + public TransactionDoCommitRequestProxyV1() { + // For Externalizable + } + + TransactionDoCommitRequestProxyV1(final TransactionDoCommitRequest request) { + super(request); + } + + @Override + protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) { + return new TransactionDoCommitRequest(target, replyTo); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequest.java new file mode 100644 index 0000000000..2e73f47772 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequest.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * A transaction request to perform the second, preCommit, step of the three-phase commit protocol. + * + * @author Robert Varga + */ +@Beta +public final class TransactionPreCommitRequest extends TransactionRequest { + private static final long serialVersionUID = 1L; + + public TransactionPreCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) { + super(target, replyTo); + } + + @Override + protected TransactionPreCommitRequestProxyV1 externalizableProxy(final ABIVersion version) { + return new TransactionPreCommitRequestProxyV1(this); + } + + @Override + protected TransactionPreCommitRequest cloneAsVersion(final ABIVersion version) { + return this; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java new file mode 100644 index 0000000000..dd41e6ab3a --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitRequestProxyV1.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Externalizable proxy for use with {@link TransactionPreCommitRequest}. It implements the initial (Boron) serialization + * format. + * + * @author Robert Varga + */ +final class TransactionPreCommitRequestProxyV1 extends AbstractTransactionRequestProxy { + private static final long serialVersionUID = 1L; + + public TransactionPreCommitRequestProxyV1() { + // For Externalizable + } + + TransactionPreCommitRequestProxyV1(final TransactionPreCommitRequest request) { + super(request); + } + + @Override + protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) { + return new TransactionPreCommitRequest(target, replyTo); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java new file mode 100644 index 0000000000..8a7da4e61b --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccess.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Successful reply to a {@link TransactionPreCommitRequest}. + * + * @author Robert Varga + */ +public final class TransactionPreCommitSuccess extends TransactionSuccess { + private static final long serialVersionUID = 1L; + + public TransactionPreCommitSuccess(final TransactionIdentifier identifier) { + super(identifier); + } + + @Override + protected AbstractTransactionSuccessProxy externalizableProxy(final ABIVersion version) { + return new TransactionPreCommitSuccessProxyV1(this); + } + + @Override + protected TransactionPreCommitSuccess cloneAsVersion(final ABIVersion version) { + return this; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java new file mode 100644 index 0000000000..2c0cdea17e --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionPreCommitSuccessProxyV1.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Externalizable proxy for use with {@link TransactionPreCommitSuccess}. It implements the initial (Boron) + * serialization format. + * + * @author Robert Varga + */ +final class TransactionPreCommitSuccessProxyV1 extends AbstractTransactionSuccessProxy { + private static final long serialVersionUID = 1L; + + public TransactionPreCommitSuccessProxyV1() { + // For Externalizable + } + + TransactionPreCommitSuccessProxyV1(final TransactionPreCommitSuccess success) { + super(success); + } + + @Override + protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target) { + return new TransactionPreCommitSuccess(target); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java new file mode 100644 index 0000000000..ae845a727f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractShardedTransaction.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; + +/** + * Abstract base class for concrete {@link DOMStoreTransaction} implementations. It holds a reference to the associated + * {@link ClientTransaction}. This abstraction layer is needed to isolate end users, who interact with + * {@link DOMStoreTransaction} from the internal implementation. + * + * @author Robert Varga + */ +abstract class AbstractShardedTransaction implements DOMStoreTransaction { + private final ClientTransaction tx; + + AbstractShardedTransaction(final ClientTransaction tx) { + this.tx = Preconditions.checkNotNull(tx); + } + + @Override + public final Object getIdentifier() { + return tx.getIdentifier(); + } + + final ClientTransaction transaction() { + return tx; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java new file mode 100644 index 0000000000..cadb61a460 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * Proxy implementation of {@link DOMStoreReadTransaction}. It routes all requests to the backing + * {@link ClientTransaction}. This class is not final to allow further subclassing by + * {@link ShardedDOMStoreReadWriteTransaction}. + * + * @author Robert Varga + */ +class ShardedDOMStoreReadTransaction extends AbstractShardedTransaction implements DOMStoreReadTransaction { + ShardedDOMStoreReadTransaction(final ClientTransaction tx) { + super(tx); + } + + @Override + public final void close() { + transaction().abort(); + } + + @Override + public final CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { + return transaction().read(path); + } + + @Override + public final CheckedFuture exists(final YangInstanceIdentifier path) { + return transaction().exists(path); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java new file mode 100644 index 0000000000..7172770f77 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * Proxy implementation of {@link DOMStoreReadWriteTransaction}. It routes all requests to the backing + * {@link ClientTransaction}. + * + * @author Robert Varga + */ +final class ShardedDOMStoreReadWriteTransaction extends ShardedDOMStoreReadTransaction implements DOMStoreReadWriteTransaction { + + ShardedDOMStoreReadWriteTransaction(final ClientTransaction tx) { + super(tx); + } + + @Override + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + transaction().write(path, data); + } + + @Override + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + transaction().merge(path, data); + } + + @Override + public void delete(final YangInstanceIdentifier path) { + transaction().delete(path); + } + + @Override + public DOMStoreThreePhaseCommitCohort ready() { + return transaction().ready(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java new file mode 100644 index 0000000000..97ed6ba4fb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; + +/** + * Implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}. It wraps + * {@link ClientTransaction} into proxies like {@link ShardedDOMStoreReadTransaction} to provide isolation. + * + * @author Robert Varga + */ +final class ShardedDOMStoreTransactionChain implements DOMStoreTransactionChain { + private final ClientLocalHistory history; + + ShardedDOMStoreTransactionChain(final ClientLocalHistory history) { + this.history = Preconditions.checkNotNull(history); + } + + @Override + public DOMStoreReadTransaction newReadOnlyTransaction() { + return new ShardedDOMStoreReadTransaction(history.createTransaction()); + } + + @Override + public DOMStoreReadWriteTransaction newReadWriteTransaction() { + return new ShardedDOMStoreReadWriteTransaction(history.createTransaction()); + } + + @Override + public DOMStoreWriteTransaction newWriteOnlyTransaction() { + return new ShardedDOMStoreWriteTransaction(history.createTransaction()); + } + + @Override + public void close() { + history.close(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java new file mode 100644 index 0000000000..6161620583 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * Proxy implementation of {@link DOMStoreWriteTransaction}. It routes all requests to the backing + * {@link ClientTransaction}. + * + * @author Robert Varga + */ +final class ShardedDOMStoreWriteTransaction extends AbstractShardedTransaction implements DOMStoreWriteTransaction { + ShardedDOMStoreWriteTransaction(final ClientTransaction tx) { + super(tx); + } + + @Override + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + transaction().write(path, data); + } + + @Override + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + transaction().merge(path, data); + } + + @Override + public void delete(final YangInstanceIdentifier path) { + transaction().delete(path); + } + + @Override + public DOMStoreThreePhaseCommitCohort ready() { + return transaction().ready(); + } + + @Override + public final void close() { + transaction().abort(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java new file mode 100644 index 0000000000..f364994e3d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Preconditions; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class for client view of a history. This class has two implementations, one for normal local histories + * and the other for single transactions. + * + * @author Robert Varga + */ +abstract class AbstractClientHistory extends LocalAbortable implements Identifiable { + static enum State { + IDLE, + TX_OPEN, + CLOSED, + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state"); + + private final Map histories = new ConcurrentHashMap<>(); + private final DistributedDataStoreClientBehavior client; + private final LocalHistoryIdentifier identifier; + + private volatile State state = State.IDLE; + + AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { + this.client = Preconditions.checkNotNull(client); + this.identifier = Preconditions.checkNotNull(identifier); + Preconditions.checkArgument(identifier.getCookie() == 0); + } + + final State state() { + return state; + } + + final void updateState(final State expected, final State next) { + final boolean success = STATE_UPDATER.compareAndSet(this, expected, next); + Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state); + } + + final LocalHistoryIdentifier getHistoryForCookie(final Long cookie) { + LocalHistoryIdentifier ret = histories.get(cookie); + if (ret == null) { + ret = new LocalHistoryIdentifier(identifier.getClientId(), identifier.getHistoryId(), cookie); + final LocalHistoryIdentifier existing = histories.putIfAbsent(cookie, ret); + if (existing != null) { + ret = existing; + } + } + + return ret; + } + + @Override + public final LocalHistoryIdentifier getIdentifier() { + return identifier; + } + + final DistributedDataStoreClientBehavior getClient() { + return client; + } + + @Override + final void localAbort(final Throwable cause) { + LOG.debug("Force-closing history {}", getIdentifier(), cause); + state = State.CLOSED; + } + + /** + * Callback invoked from {@link ClientTransaction} when a transaction has been sub + * + * @param transaction Transaction handle + */ + void onTransactionReady(final ClientTransaction transaction) { + client.transactionComplete(transaction); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java new file mode 100644 index 0000000000..60919c05a4 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -0,0 +1,228 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; + +/** + * Class translating transaction operations towards a particular backend shard. + * + * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state + * transitions coming from interactions with backend are expected to be thread-safe. + * + * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision + * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction. + * + * @author Robert Varga + */ +abstract class AbstractProxyTransaction implements Identifiable { + private final DistributedDataStoreClientBehavior client; + + private long sequence; + private boolean sealed; + + AbstractProxyTransaction(final DistributedDataStoreClientBehavior client) { + this.client = Preconditions.checkNotNull(client); + } + + /** + * Instantiate a new tracker for a transaction. This method bases its decision on which implementation to use + * based on provided {@link ShardBackendInfo}. If no information is present, it will choose the remote + * implementation, which is fine, as the queueing logic in ClientActorBehavior will hold on to the requests until + * the backend is located. + * + * @param client Client behavior + * @param historyId Local history identifier + * @param transactionId Transaction identifier + * @param backend Optional backend identifier + * @return A new state tracker + */ + static AbstractProxyTransaction create(final DistributedDataStoreClientBehavior client, + final LocalHistoryIdentifier historyId, final long transactionId, + final java.util.Optional backend) { + + final java.util.Optional dataTree = backend.flatMap(t -> t.getDataTree()); + final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId); + if (dataTree.isPresent()) { + return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot()); + } else { + return new RemoteProxyTransaction(client, identifier); + } + } + + final DistributedDataStoreClientBehavior client() { + return client; + } + + final long nextSequence() { + return sequence++; + } + + final void delete(final YangInstanceIdentifier path) { + checkSealed(); + doDelete(path); + } + + final void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + checkSealed(); + doMerge(path, data); + } + + final void write(final YangInstanceIdentifier path, final NormalizedNode data) { + checkSealed(); + doWrite(path, data); + } + + final CheckedFuture exists(final YangInstanceIdentifier path) { + checkSealed(); + return doExists(path); + } + + final CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { + checkSealed(); + return doRead(path); + } + + /** + * Seal this transaction before it is either + */ + final void seal() { + checkSealed(); + doSeal(); + sealed = true; + } + + private void checkSealed() { + Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier()); + } + + /** + * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message + * being sent to the backend. + */ + final void abort() { + checkSealed(); + doAbort(); + } + + /** + * Commit this transaction, possibly in a coordinated fashion. + * + * @param coordinated True if this transaction should be coordinated across multiple participants. + * @return Future completion + */ + final ListenableFuture directCommit() { + checkSealed(); + + final SettableFuture ret = SettableFuture.create(); + client().sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(false)), t -> { + if (t instanceof TransactionCommitSuccess) { + ret.set(Boolean.TRUE); + } else if (t instanceof RequestFailure) { + ret.setException(((RequestFailure) t).getCause()); + } else { + ret.setException(new IllegalStateException("Unhandled response " + t.getClass())); + } + }); + return ret; + } + + void abort(final VotingFuture ret) { + checkSealed(); + + client.sendRequest(nextSequence(), new TransactionAbortRequest(getIdentifier(), client().self()), t -> { + if (t instanceof TransactionAbortSuccess) { + ret.voteYes(); + } else if (t instanceof RequestFailure) { + ret.voteNo(((RequestFailure) t).getCause()); + } else { + ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + } + }); + } + + void canCommit(final VotingFuture ret) { + checkSealed(); + + client.sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(true)), t -> { + if (t instanceof TransactionCanCommitSuccess) { + ret.voteYes(); + } else if (t instanceof RequestFailure) { + ret.voteNo(((RequestFailure) t).getCause()); + } else { + ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + } + }); + } + + void preCommit(final VotingFuture ret) { + checkSealed(); + + client.sendRequest(nextSequence(), new TransactionPreCommitRequest(getIdentifier(), client().self()), t-> { + if (t instanceof TransactionPreCommitSuccess) { + ret.voteYes(); + } else if (t instanceof RequestFailure) { + ret.voteNo(((RequestFailure) t).getCause()); + } else { + ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + } + }); + } + + void doCommit(final VotingFuture ret) { + checkSealed(); + + client.sendRequest(nextSequence(), new TransactionDoCommitRequest(getIdentifier(), client().self()), t-> { + if (t instanceof TransactionCommitSuccess) { + ret.voteYes(); + } else if (t instanceof RequestFailure) { + ret.voteNo(((RequestFailure) t).getCause()); + } else { + ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass())); + } + }); + } + + abstract void doDelete(final YangInstanceIdentifier path); + + abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode data); + + abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode data); + + abstract CheckedFuture doExists(final YangInstanceIdentifier path); + + abstract CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path); + + abstract void doSeal(); + + abstract void doAbort(); + + abstract TransactionRequest doCommit(boolean coordinated); + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java new file mode 100644 index 0000000000..51b1700773 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; + +/** + * Base class for internal {@link DOMStoreThreePhaseCommitCohort} implementation. It contains utility constants for + * wide reuse. + * + * @author Robert Varga + */ +abstract class AbstractTransactionCommitCohort implements DOMStoreThreePhaseCommitCohort { + static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); + static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java index aded49b144..8f2ee88563 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java @@ -7,13 +7,12 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import akka.actor.ActorRef; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; import com.google.common.base.Verify; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; /** * Client-side view of a local history. This class tracks all state related to a particular history and routes @@ -26,41 +25,42 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie * @author Robert Varga */ @Beta -public final class ClientLocalHistory implements AutoCloseable { - private static final AtomicIntegerFieldUpdater STATE_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(ClientLocalHistory.class, "state"); - private static final int IDLE_STATE = 0; - private static final int CLOSED_STATE = 1; +public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable { - private final ClientIdentifier clientId; - private final long historyId; - private final ActorRef backendActor; - private final ActorRef clientActor; + private static final AtomicLongFieldUpdater NEXT_TX_UPDATER = + AtomicLongFieldUpdater.newUpdater(ClientLocalHistory.class, "nextTx"); - private volatile int state = IDLE_STATE; + // Used via NEXT_TX_UPDATER + @SuppressWarnings("unused") + private volatile long nextTx = 0; - ClientLocalHistory(final DistributedDataStoreClientBehavior client, final long historyId, - final ActorRef backendActor) { - this.clientActor = client.self(); - this.backendActor = Preconditions.checkNotNull(backendActor); - this.clientId = Verify.verifyNotNull(client.getIdentifier()); - this.historyId = historyId; + ClientLocalHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) { + super(client, historyId); } - private void checkNotClosed() { - if (state == CLOSED_STATE) { - throw new IllegalStateException("Local history " + new LocalHistoryIdentifier(clientId, historyId) + " is closed"); - } + public ClientTransaction createTransaction() { + final State local = state(); + Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local); + updateState(local, State.TX_OPEN); + + return new ClientTransaction(getClient(), this, + new TransactionIdentifier(getIdentifier(), NEXT_TX_UPDATER.getAndIncrement(this))); } @Override public void close() { - if (STATE_UPDATER.compareAndSet(this, IDLE_STATE, CLOSED_STATE)) { - // FIXME: signal close to both client actor and backend actor - } else if (state != CLOSED_STATE) { - throw new IllegalStateException("Cannot close history with an open transaction"); + final State local = state(); + if (local != State.CLOSED) { + Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this); + updateState(local, State.CLOSED); } } - // FIXME: add client requests related to a particular local history + @Override + void onTransactionReady(final ClientTransaction transaction) { + final State local = state(); + Verify.verify(local == State.TX_OPEN, "Local history %s is in unexpected state %s", this, local); + updateState(local, State.IDLE); + super.onTransactionReady(transaction); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java new file mode 100644 index 0000000000..f5f545a48e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.annotations.Beta; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client-side view of a free-standing transaction. + * + * This interface is used by the world outside of the actor system and in the actor system it is manifested via + * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to + * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor. + * + * It is internally composed of multiple {@link RemoteProxyTransaction}s, each responsible for a component shard. + * + * Implementation is quite a bit complex, and involves cooperation with {@link AbstractClientHistory} for tracking + * gaps in transaction identifiers seen by backends. + * + * These gaps need to be accounted for in the transaction setup message sent to a particular backend, so it can verify + * that the requested transaction is in-sequence. This is critical in ensuring that transactions (which are independent + * entities from message queueing perspective) do not get reodered -- thus allowing multiple in-flight transactions. + * + * Alternative would be to force visibility by sending an abort request to all potential backends, but that would mean + * that even empty transactions increase load on all shards -- which would be a scalability issue. + * + * Yet another alternative would be to introduce inter-transaction dependencies to the queueing layer in client actor, + * but that would require additional indirection and complexity. + * + * @author Robert Varga + */ +@Beta +public final class ClientTransaction extends LocalAbortable implements Identifiable { + private static final Logger LOG = LoggerFactory.getLogger(ClientTransaction.class); + private static final AtomicIntegerFieldUpdater STATE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ClientTransaction.class, "state"); + private static final int OPEN_STATE = 0; + private static final int CLOSED_STATE = 1; + + private final Map proxies = new HashMap<>(); + private final TransactionIdentifier transactionId; + private final AbstractClientHistory parent; + + private volatile int state = OPEN_STATE; + + ClientTransaction(final DistributedDataStoreClientBehavior client, final AbstractClientHistory parent, + final TransactionIdentifier transactionId) { + this.transactionId = Preconditions.checkNotNull(transactionId); + this.parent = Preconditions.checkNotNull(parent); + } + + private void checkNotClosed() { + Preconditions.checkState(state == OPEN_STATE, "Transaction %s is closed", transactionId); + } + + private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) { + checkNotClosed(); + + final ModuleShardBackendResolver resolver = parent.getClient().resolver(); + final Long shard = resolver.resolveShardForPath(path); + AbstractProxyTransaction ret = proxies.get(shard); + if (ret == null) { + ret = AbstractProxyTransaction.create(parent.getClient(), parent.getHistoryForCookie(shard), + transactionId.getTransactionId(), resolver.getFutureBackendInfo(shard)); + proxies.put(shard, ret); + } + return ret; + } + + @Override + public TransactionIdentifier getIdentifier() { + return transactionId; + } + + public CheckedFuture exists(final YangInstanceIdentifier path) { + return ensureProxy(path).exists(path); + } + + public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { + return ensureProxy(path).read(path); + } + + public void delete(final YangInstanceIdentifier path) { + ensureProxy(path).delete(path); + } + + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + ensureProxy(path).merge(path, data); + } + + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + ensureProxy(path).write(path, data); + } + + private boolean ensureClosed() { + final int local = state; + if (local != CLOSED_STATE) { + final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE); + Preconditions.checkState(success, "Transaction %s raced during close", this); + return true; + } else { + return false; + } + } + + public DOMStoreThreePhaseCommitCohort ready() { + Preconditions.checkState(ensureClosed(), "Attempted to submit a closed transaction %s", this); + + for (AbstractProxyTransaction p : proxies.values()) { + p.seal(); + } + parent.onTransactionReady(this); + + switch (proxies.size()) { + case 0: + return EmptyTransactionCommitCohort.INSTANCE; + case 1: + return new DirectTransactionCommitCohort(Iterables.getOnlyElement(proxies.values())); + default: + return new ClientTransactionCommitCohort(proxies.values()); + } + } + + /** + * Release all state associated with this transaction. + */ + public void abort() { + if (ensureClosed()) { + for (AbstractProxyTransaction proxy : proxies.values()) { + proxy.abort(); + } + proxies.clear(); + } + } + + @Override + void localAbort(final Throwable cause) { + LOG.debug("Aborting transaction {}", getIdentifier(), cause); + abort(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java new file mode 100644 index 0000000000..2521c38f91 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Collection; +import java.util.List; + +final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohort { + private final List proxies; + + /** + * @param clientTransaction + */ + ClientTransactionCommitCohort(final Collection proxies) { + this.proxies = ImmutableList.copyOf(proxies); + } + + @Override + public ListenableFuture canCommit() { + /* + * Issue the request to commit for all participants. We will track the results and report them. + */ + final VotingFuture ret = new VotingFuture<>(Boolean.TRUE, proxies.size()); + for (AbstractProxyTransaction proxy : proxies) { + proxy.canCommit(ret); + } + + return ret; + } + + @Override + public ListenableFuture preCommit() { + final VotingFuture ret = new VotingFuture<>(null, proxies.size()); + for (AbstractProxyTransaction proxy : proxies) { + proxy.preCommit(ret); + } + + return ret; + } + + @Override + public ListenableFuture commit() { + final VotingFuture ret = new VotingFuture<>(null, proxies.size()); + for (AbstractProxyTransaction proxy : proxies) { + proxy.doCommit(ret); + } + + return ret; + } + + @Override + public ListenableFuture abort() { + final VotingFuture ret = new VotingFuture<>(null, proxies.size()); + for (AbstractProxyTransaction proxy : proxies) { + proxy.abort(ret); + } + + return ret; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java new file mode 100644 index 0000000000..007ac53d98 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * An {@link AbstractTransactionCommitCohort} implementation for transactions which contain a single proxy. Since there + * is only one proxy, + * + * @author Robert Varga + */ +final class DirectTransactionCommitCohort extends AbstractTransactionCommitCohort { + private final AbstractProxyTransaction proxy; + + /** + * @param clientTransaction + */ + DirectTransactionCommitCohort(final AbstractProxyTransaction proxy) { + this.proxy = Preconditions.checkNotNull(proxy); + } + + @Override + public ListenableFuture canCommit() { + return proxy.directCommit(); + } + + @Override + public ListenableFuture preCommit() { + return VOID_FUTURE; + } + + @Override + public ListenableFuture abort() { + return VOID_FUTURE; + } + + @Override + public ListenableFuture commit() { + return VOID_FUTURE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java index 82c839e438..0d22d564fe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClient.java @@ -8,7 +8,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.annotations.Beta; -import java.util.concurrent.CompletionStage; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; @@ -23,19 +23,22 @@ import org.opendaylight.yangtools.concepts.Identifiable; @Beta public interface DistributedDataStoreClient extends Identifiable, AutoCloseable { @Override - ClientIdentifier getIdentifier(); + @Nonnull ClientIdentifier getIdentifier(); @Override void close(); /** - * Create a new local history. This method initiates an asynchronous instantiation of a local history on the back - * end. ClientLocalHistory represents the interface exposed to the client. + * Create a new local history. ClientLocalHistory represents the interface exposed to the client. * - * @return Future client history handle + * @return Client history handle */ - CompletionStage createLocalHistory(); - - // TODO: add methods required by DistributedDataStore + @Nonnull ClientLocalHistory createLocalHistory(); + /** + * Create a new free-standing transaction. + * + * @return Client transaction handle + */ + @Nonnull ClientTransaction createTransaction(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index 364e462e57..5aca99ae59 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -9,9 +9,16 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; import akka.actor.Status; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; +import com.google.common.base.Throwables; +import com.google.common.base.Verify; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior; import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -47,12 +54,19 @@ import org.slf4j.LoggerFactory; final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class); + private final Map transactions = new ConcurrentHashMap<>(); + private final Map histories = new ConcurrentHashMap<>(); + private final AtomicLong nextHistoryId = new AtomicLong(1); + private final AtomicLong nextTransactionId = new AtomicLong(); private final ModuleShardBackendResolver resolver; - private long nextHistoryId; + private final SingleClientHistory singleHistory; + + private volatile Throwable aborted; DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) { super(context); resolver = new ModuleShardBackendResolver(actorContext); + singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0)); } // @@ -63,26 +77,37 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple @Override protected void haltClient(final Throwable cause) { - // FIXME: Add state flushing here once we have state + // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up + // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor) + // thread. + if (aborted != null) { + abortOperations(cause); + } } - private ClientActorBehavior createLocalHistory(final ClientActorBehavior currentBehavior, - final CompletableFuture future) { - final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++); - LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future); + private void abortOperations(final Throwable cause) { + // This acts as a barrier, application threads check this after they have added an entry in the maps, + // and if they observe aborted being non-null, they will perform their cleanup and not return the handle. + aborted = cause; - // FIXME: initiate backend instantiation - future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); - return currentBehavior; + for (ClientLocalHistory h : histories.values()) { + h.localAbort(cause); + } + histories.clear(); + + for (ClientTransaction t : transactions.values()) { + t.localAbort(cause); + } + transactions.clear(); } - private ClientActorBehavior shutdown(final ClientActorBehavior currentBehavior) { - // FIXME: Add shutdown procedures here + private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) { + abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down")); return null; } @Override - protected ClientActorBehavior onCommand(final Object command) { + protected DistributedDataStoreClientBehavior onCommand(final Object command) { if (command instanceof GetClientRequest) { ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender()); } else { @@ -98,11 +123,41 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple // // + private static V returnIfOperational(final Map map, final K key, final V value, + final Throwable aborted) { + Verify.verify(map.put(key, value) == null); + + if (aborted != null) { + try { + value.localAbort(aborted); + } catch (Exception e) { + LOG.debug("Close of {} failed", value, e); + } + map.remove(key, value); + throw Throwables.propagate(aborted); + } + + return value; + } + @Override - public CompletionStage createLocalHistory() { - final CompletableFuture future = new CompletableFuture<>(); - context().executeInActor(currentBehavior -> createLocalHistory(currentBehavior, future)); - return future; + public ClientLocalHistory createLocalHistory() { + final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), + nextHistoryId.getAndIncrement()); + final ClientLocalHistory history = new ClientLocalHistory(this, historyId); + LOG.debug("{}: creating a new local history {}", persistenceId(), history); + + return returnIfOperational(histories, historyId, history, aborted); + } + + @Override + public ClientTransaction createTransaction() { + final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(), + nextTransactionId.getAndIncrement()); + final ClientTransaction tx = new ClientTransaction(this, singleHistory, txId); + LOG.debug("{}: creating a new transaction {}", persistenceId(), tx); + + return returnIfOperational(transactions, txId, tx, aborted); } @Override @@ -114,4 +169,16 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple protected ModuleShardBackendResolver resolver() { return resolver; } + + void transactionComplete(final ClientTransaction transaction) { + transactions.remove(transaction.getIdentifier()); + } + + void sendRequest(final long sequence, final TransactionRequest request, final Consumer> completer) { + sendRequest(sequence, request, response -> { + completer.accept(response); + return this; + }); + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java new file mode 100644 index 0000000000..7032660068 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; + +/** + * An {@link AbstractTransactionCommitCohort} for use with empty transactions. This relies on the fact that no backends + * have been touched, hence all state book-keeping needs to happen only locally and shares fate with the coordinator. + * + * Therefore all methods can finish immediately without any effects. + * + * @author Robert Varga + */ +final class EmptyTransactionCommitCohort extends AbstractTransactionCommitCohort { + static final DOMStoreThreePhaseCommitCohort INSTANCE = new EmptyTransactionCommitCohort(); + + private EmptyTransactionCommitCohort() { + // Hidden + } + + @Override + public ListenableFuture canCommit() { + return TRUE_FUTURE; + } + + @Override + public ListenableFuture preCommit() { + return VOID_FUTURE; + } + + @Override + public ListenableFuture abort() { + return VOID_FUTURE; + } + + @Override + public ListenableFuture commit() { + return VOID_FUTURE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java new file mode 100644 index 0000000000..b21b46dab8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/FailedDataTreeModification.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.function.Supplier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; + +/** + * An implementation of DataTreeModification which throws a specified {@link RuntimeException} when any of its methods + * are invoked. + * + * @author Robert Varga + */ +final class FailedDataTreeModification implements DataTreeModification { + private final Supplier supplier; + + FailedDataTreeModification(final Supplier supplier) { + this.supplier = Preconditions.checkNotNull(supplier); + } + + @Override + public Optional> readNode(final YangInstanceIdentifier path) { + throw supplier.get(); + } + + @Override + public DataTreeModification newModification() { + throw supplier.get(); + } + + @Override + public void delete(final YangInstanceIdentifier path) { + throw supplier.get(); + } + + @Override + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + throw supplier.get(); + } + + @Override + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + throw supplier.get(); + } + + @Override + public void ready() { + throw supplier.get(); + } + + @Override + public void applyToCursor(final DataTreeModificationCursor cursor) { + throw supplier.get(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalAbortable.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalAbortable.java new file mode 100644 index 0000000000..5b0708f120 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalAbortable.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +/** + * Common interface for client histories and client transactions, which can be aborted immediately without replicating + * the effect to the backend. This is needed for abrupt shutdowns. + * + * Since classes which need to expose this functionality do not need a base class, this is an abstract class and not + * an interface -- which allows us to not leak the {@link #localAbort(Throwable)} method. + * + * @author Robert Varga + */ +abstract class LocalAbortable { + /** + * Immediately abort this object. + * + * @param cause Failure which caused this abort. + */ + abstract void localAbort(Throwable cause); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java new file mode 100644 index 0000000000..1dec84631a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import java.util.function.Consumer; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with + * the client instance. + * + * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations + * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard + * leader. + * + * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the + * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern. + * + * @author Robert Varga + */ +@NotThreadSafe +final class LocalProxyTransaction extends AbstractProxyTransaction { + private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class); + private static final Consumer> ABORT_COMPLETER = response -> { + LOG.debug("Abort completed with {}", response); + }; + + private final TransactionIdentifier identifier; + private DataTreeModification modification; + + LocalProxyTransaction(final DistributedDataStoreClientBehavior client, + final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) { + super(client); + this.identifier = Preconditions.checkNotNull(identifier); + this.modification = snapshot.newModification(); + } + + @Override + public TransactionIdentifier getIdentifier() { + return identifier; + } + + @Override + void doDelete(final YangInstanceIdentifier path) { + modification.delete(path); + } + + @Override + void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { + modification.merge(path, data); + } + + @Override + void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { + modification.write(path, data); + } + + @Override + CheckedFuture doExists(final YangInstanceIdentifier path) { + return Futures.immediateCheckedFuture(modification.readNode(path).isPresent()); + } + + @Override + CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { + return Futures.immediateCheckedFuture(modification.readNode(path)); + } + + @Override + void doAbort() { + client().sendRequest(nextSequence(), new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER); + modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted")); + } + + @Override + CommitLocalTransactionRequest doCommit(final boolean coordinated) { + final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, client().self(), + modification, coordinated); + modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted")); + return ret; + } + + @Override + void doSeal() { + modification.ready(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index 6f15c72a46..bef863ddf4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -7,23 +7,30 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import akka.dispatch.ExecutionContexts; +import akka.dispatch.OnComplete; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableBiMap.Builder; import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfo; import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfoResolver; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.compat.java8.FutureConverters; +import scala.concurrent.ExecutionContext; /** * {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named @@ -33,7 +40,11 @@ import scala.compat.java8.FutureConverters; * @author Robert Varga */ final class ModuleShardBackendResolver extends BackendInfoResolver { + private static final ExecutionContext DIRECT_EXECUTION_CONTEXT = + ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()); + private static final CompletableFuture NULL_FUTURE = CompletableFuture.completedFuture(null); private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class); + /** * Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure. * All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain @@ -44,7 +55,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver shards = ImmutableBiMap.of(); + @GuardedBy("this") + private long nextShard = 1; + + private volatile BiMap shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); // FIXME: we really need just ActorContext.findPrimaryShardAsync() ModuleShardBackendResolver(final ActorContext actorContext) { @@ -63,17 +77,49 @@ final class ModuleShardBackendResolver extends BackendInfoResolver b = ImmutableBiMap.builder(); + b.putAll(shards); + b.put(shardName, cookie); + shards = b.build(); + } + } + } + + return cookie; + } + @Override - protected CompletionStage resolveBackendInfo(final Long cookie) { + protected CompletableFuture resolveBackendInfo(final Long cookie) { final String shardName = shards.inverse().get(cookie); if (shardName == null) { LOG.warn("Failing request for non-existent cookie {}", cookie); - return CompletableFuture.completedFuture(null); + return NULL_FUTURE; } + final CompletableFuture ret = new CompletableFuture<>(); + + actorContext.findPrimaryShardAsync(shardName).onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable t, final PrimaryShardInfo v) { + if (t != null) { + ret.completeExceptionally(t); + } else { + ret.complete(createBackendInfo(v, shardName, cookie)); + } + } + }, DIRECT_EXECUTION_CONTEXT); + LOG.debug("Resolving cookie {} to shard {}", cookie, shardName); - return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)) - .thenApply(o -> createBackendInfo(o, shardName, cookie)); + return ret; } private static ABIVersion toABIVersion(final short version) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java new file mode 100644 index 0000000000..13b7fb4f56 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionMerge; +import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently + * not known or is known to be not co-located with the client. + * + * It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for + * maintaining any submitted operations until the leader is discovered. + * + * This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state + * transitions based on backend responses are thread-safe. + * + * @author Robert Varga + */ +final class RemoteProxyTransaction extends AbstractProxyTransaction { + private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class); + + // FIXME: make this tuneable + private static final int REQUEST_MAX_MODIFICATIONS = 1000; + + private final ModifyTransactionRequestBuilder builder; + + private boolean builderBusy; + + private volatile Exception operationFailure; + + RemoteProxyTransaction(final DistributedDataStoreClientBehavior client, + final TransactionIdentifier identifier) { + super(client); + builder = new ModifyTransactionRequestBuilder(identifier, client.self()); + } + + @Override + public TransactionIdentifier getIdentifier() { + return builder.getIdentifier(); + } + + @Override + void doDelete(final YangInstanceIdentifier path) { + appendModification(new TransactionDelete(path)); + } + + @Override + void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { + appendModification(new TransactionMerge(path, data)); + } + + @Override + void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { + appendModification(new TransactionWrite(path, data)); + } + + private CheckedFuture sendReadRequest(final AbstractReadTransactionRequest request, + final Consumer> completer, final ListenableFuture future) { + // Check if a previous operation failed. If it has, do not bother sending anything and report a failure + final Exception local = operationFailure; + if (local != null) { + return Futures.immediateFailedCheckedFuture(new ReadFailedException("Previous operation failed", local)); + } + + // Make sure we send any modifications before issuing a read + ensureFlushedBuider(); + client().sendRequest(nextSequence(), request, completer); + return MappingCheckedFuture.create(future, ReadFailedException.MAPPER); + } + + @Override + CheckedFuture doExists(final YangInstanceIdentifier path) { + final SettableFuture future = SettableFuture.create(); + return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), client().self(), path), + t -> completeExists(future, t), future); + } + + @Override + CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { + final SettableFuture>> future = SettableFuture.create(); + return sendReadRequest(new ReadTransactionRequest(getIdentifier(), client().self(), path), + t -> completeRead(future, t), future); + } + + @Override + void doAbort() { + ensureInitializedBuider(); + builder.setAbort(); + flushBuilder(); + } + + private void ensureInitializedBuider() { + if (!builderBusy) { + builderBusy = true; + } + } + + private void ensureFlushedBuider() { + if (builderBusy) { + flushBuilder(); + } + } + + private void flushBuilder() { + client().sendRequest(nextSequence(), builder.build(), this::completeModify); + builderBusy = false; + } + + private void appendModification(final TransactionModification modification) { + if (operationFailure == null) { + ensureInitializedBuider(); + + builder.addModification(modification); + if (builder.size() >= REQUEST_MAX_MODIFICATIONS) { + flushBuilder(); + } + } else { + LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier()); + } + } + + private void completeModify(final Response response) { + LOG.debug("Modification request completed with {}", response); + + if (response instanceof TransactionSuccess) { + // Happy path no-op + } else { + recordFailedResponse(response); + } + } + + private Exception recordFailedResponse(final Response response) { + final Exception failure; + if (response instanceof RequestFailure) { + failure = ((RequestFailure) response).getCause(); + } else { + LOG.warn("Unhandled response {}", response); + failure = new IllegalArgumentException("Unhandled response " + response.getClass()); + } + + if (operationFailure == null) { + LOG.debug("Transaction {} failed", getIdentifier(), failure); + operationFailure = failure; + } + return failure; + } + + private void failFuture(final SettableFuture future, final Response response) { + future.setException(recordFailedResponse(response)); + } + + private void completeExists(final SettableFuture future, final Response response) { + LOG.debug("Exists request completed with {}", response); + + if (response instanceof ExistsTransactionSuccess) { + future.set(((ExistsTransactionSuccess) response).getExists()); + } else { + failFuture(future, response); + } + } + + private void completeRead(final SettableFuture>> future, final Response response) { + LOG.debug("Read request completed with {}", response); + + if (response instanceof ReadTransactionSuccess) { + future.set(((ReadTransactionSuccess) response).getData()); + } else { + failFuture(future, response); + } + } + + @Override + ModifyTransactionRequest doCommit(final boolean coordinated) { + ensureInitializedBuider(); + builder.setCommit(coordinated); + + final ModifyTransactionRequest ret = builder.build(); + builderBusy = false; + return ret; + } + + @Override + void doSeal() { + // No-op + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java new file mode 100644 index 0000000000..b57e9b669e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; + +/** + * An {@link AbstractClientHistory} which handles free-standing transactions. + * + * @author Robert Varga + */ +final class SingleClientHistory extends AbstractClientHistory { + protected SingleClientHistory(final DistributedDataStoreClientBehavior client, + final LocalHistoryIdentifier identifier) { + super(client, identifier); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/VotingFuture.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/VotingFuture.java new file mode 100644 index 0000000000..b6aba31e50 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/VotingFuture.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.util.concurrent.AbstractFuture; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import javax.annotation.concurrent.GuardedBy; + +/** + * An {@link AbstractFuture} implementation which requires a certain number of votes before it completes. If all votes + * are 'yes', then it completes with a pre-determined value. If any of the votes are 'no', the future completes with + * an exception. This exception corresponds to the cause reported by the first 'no' vote, with all subsequent votes + * added as suppressed exceptions. + * + * Implementation is geared toward positive votes. Negative votes have to synchronize and therefore are more likely + * to see contention. + * + * @author Robert Varga + * + * @param Type of value returned on success + */ +class VotingFuture extends AbstractFuture { + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater VOTES_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(VotingFuture.class, "neededVotes"); + + private final T result; + + @GuardedBy("failures") + private final Collection failures = new ArrayList<>(0); + @SuppressWarnings("unused") + private volatile int neededVotes; + + VotingFuture(final T result, final int requiredVotes) { + Preconditions.checkArgument(requiredVotes > 0); + this.neededVotes = requiredVotes; + + // null is okay to allow Void type + this.result = result; + } + + void voteYes() { + if (castVote()) { + synchronized (failures) { + resolveResult(); + } + } + } + + void voteNo(final Throwable cause) { + synchronized (failures) { + failures.add(cause); + if (castVote()) { + resolveResult(); + } + } + } + + private boolean castVote() { + final int votes = VOTES_UPDATER.decrementAndGet(this); + Verify.verify(votes >= 0); + return votes == 0; + } + + @GuardedBy("failures") + private void resolveResult() { + final Iterator it = failures.iterator(); + if (!it.hasNext()) { + set(result); + return; + } + + final Throwable t = it.next(); + while (it.hasNext()) { + t.addSuppressed(it.next()); + } + + setException(t); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfoResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfoResolver.java index 2868998dac..b2fbc6b336 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfoResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/BackendInfoResolver.java @@ -9,9 +9,13 @@ package org.opendaylight.controller.cluster.datastore.actors.client; import akka.actor.ActorRef; import com.google.common.base.Preconditions; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; @@ -28,12 +32,26 @@ import org.slf4j.LoggerFactory; @ThreadSafe public abstract class BackendInfoResolver { private static final Logger LOG = LoggerFactory.getLogger(BackendInfoResolver.class); - private final ConcurrentMap> backends = new ConcurrentHashMap<>(); + private final ConcurrentMap> backends = new ConcurrentHashMap<>(); - // This is what the client needs to start processing. For as long as we do not have this, we should not complete - // this stage until we have this information - public final CompletionStage getBackendInfo(final long cookie) { - return backends.computeIfAbsent(cookie, this::resolveBackendInfo); + /** + * Return the currently-resolved backend information, if available. This method is guaranteed not to block, but will + * initiate resolution of the information if there is none. + * + * @param cookie Backend cookie + * @return Backend information, if available + */ + public final Optional getFutureBackendInfo(final Long cookie) { + final Future f = lookupBackend(cookie); + if (f.isDone()) { + try { + return Optional.of(f.get()); + } catch (InterruptedException | ExecutionException e) { + LOG.debug("Resolution of {} failed", f, e); + } + } + + return Optional.empty(); } /** @@ -55,9 +73,9 @@ public abstract class BackendInfoResolver { * requests information which is not currently cached. * * @param cookie Backend cookie - * @return A {@link CompletionStage} resulting in information about the backend + * @return A {@link CompletableFuture} resulting in information about the backend */ - protected abstract @Nonnull CompletionStage resolveBackendInfo(final @Nonnull Long cookie); + protected abstract @Nonnull CompletableFuture resolveBackendInfo(final @Nonnull Long cookie); /** * Invalidate previously-resolved shard information. This method is invoked when a timeout is detected @@ -66,4 +84,14 @@ public abstract class BackendInfoResolver { * @param info Previous promise of backend information */ protected abstract void invalidateBackendInfo(@Nonnull CompletionStage info); + + // This is what the client needs to start processing. For as long as we do not have this, we should not complete + // this stage until we have this information + final CompletionStage getBackendInfo(final Long cookie) { + return lookupBackend(cookie); + } + + private CompletableFuture lookupBackend(final Long cookie) { + return backends.computeIfAbsent(Preconditions.checkNotNull(cookie), this::resolveBackendInfo); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java index 530d7e277f..c78862719e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java @@ -117,5 +117,4 @@ public class ClientActorContext extends AbstractClientActorContext implements Id queues.clear(); } - } -- 2.36.6