--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Abstract base class for serialization proxies associated with {@link LocalHistoryRequest}s.
+ *
+ * @author Robert Varga
+ *
+ * @param <T> Message type
+ */
+abstract class AbstractLocalHistoryRequestProxy<T extends LocalHistoryRequest<T>> extends AbstractRequestProxy<LocalHistoryIdentifier, T> {
+ private static final long serialVersionUID = 1L;
+
+ AbstractLocalHistoryRequestProxy() {
+ // For Externalizable
+ }
+
+ AbstractLocalHistoryRequestProxy(final T request) {
+ super(request);
+ }
+
+ @Override
+ protected final LocalHistoryIdentifier readTarget(final DataInput in) throws IOException {
+ return LocalHistoryIdentifier.readFrom(in);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Request to create a new local history.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class CreateLocalHistoryRequest extends LocalHistoryRequest<CreateLocalHistoryRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public CreateLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+ super(target, replyTo);
+ }
+
+ private CreateLocalHistoryRequest(final CreateLocalHistoryRequest request, final ABIVersion version) {
+ super(request, version);
+ }
+
+ @Override
+ protected AbstractLocalHistoryRequestProxy<CreateLocalHistoryRequest> externalizableProxy(final ABIVersion version) {
+ return new CreateLocalHistoryRequestProxyV1(this);
+ }
+
+ @Override
+ protected CreateLocalHistoryRequest cloneAsVersion(final ABIVersion version) {
+ return new CreateLocalHistoryRequest(this, version);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link CreateLocalHistoryRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class CreateLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestProxy<CreateLocalHistoryRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public CreateLocalHistoryRequestProxyV1() {
+ // For Externalizable
+ }
+
+ CreateLocalHistoryRequestProxyV1(final CreateLocalHistoryRequest request) {
+ super(request);
+ }
+
+ @Override
+ protected CreateLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+ return new CreateLocalHistoryRequest(target, replyTo);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * A {@link RequestException} indicating that the backend has received a request to create a history which has already
+ * been retired.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class DeadHistoryException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ public DeadHistoryException(final long lastSeenHistory) {
+ super("Histories up to " + Long.toUnsignedString(lastSeenHistory) + " are accounted for");
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return true;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Request to destroy a local history.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class DestroyLocalHistoryRequest extends LocalHistoryRequest<DestroyLocalHistoryRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public DestroyLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+ super(target, replyTo);
+ }
+
+ private DestroyLocalHistoryRequest(final DestroyLocalHistoryRequest request, final ABIVersion version) {
+ super(request, version);
+ }
+
+ @Override
+ protected AbstractLocalHistoryRequestProxy<DestroyLocalHistoryRequest> externalizableProxy(final ABIVersion version) {
+ return new DestroyLocalHistoryRequestProxyV1(this);
+ }
+
+ @Override
+ protected DestroyLocalHistoryRequest cloneAsVersion(final ABIVersion version) {
+ return new DestroyLocalHistoryRequest(this, version);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link DestroyLocalHistoryRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class DestroyLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestProxy<DestroyLocalHistoryRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public DestroyLocalHistoryRequestProxyV1() {
+ // For Externalizable
+ }
+
+ DestroyLocalHistoryRequestProxyV1(final DestroyLocalHistoryRequest request) {
+ super(request);
+ }
+
+ @Override
+ protected DestroyLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+ return new DestroyLocalHistoryRequest(target, replyTo);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+
+/**
+ * Generic {@link RequestFailure} involving a {@link LocalHistoryRequest}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class LocalHistoryFailure extends RequestFailure<LocalHistoryIdentifier, LocalHistoryFailure> {
+ private static final long serialVersionUID = 1L;
+
+ LocalHistoryFailure(final LocalHistoryIdentifier target, final RequestException cause) {
+ super(target, cause);
+ }
+
+ @Override
+ protected LocalHistoryFailure cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+
+ @Override
+ protected LocalHistoryFailureProxyV1 externalizableProxy(final ABIVersion version) {
+ return new LocalHistoryFailureProxyV1(this);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * Externalizable proxy for use with {@link LocalHistoryFailure}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class LocalHistoryFailureProxyV1 extends AbstractRequestFailureProxy<LocalHistoryIdentifier, LocalHistoryFailure> {
+ private static final long serialVersionUID = 1L;
+
+ public LocalHistoryFailureProxyV1() {
+ // For Externalizable
+ }
+
+ LocalHistoryFailureProxyV1(final LocalHistoryFailure failure) {
+ super(failure);
+ }
+
+ @Override
+ protected LocalHistoryFailure createFailure(final LocalHistoryIdentifier target, final RequestException cause) {
+ return new LocalHistoryFailure(target, cause);
+ }
+
+ @Override
+ protected LocalHistoryIdentifier readTarget(final DataInput in) throws IOException {
+ return LocalHistoryIdentifier.readFrom(in);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * Abstract base class for {@link Request}s involving specific local history. This class is visible outside of this
+ * package solely for the ability to perform a unified instanceof check.
+ *
+ * @author Robert Varga
+ *
+ * @param <T> Message type
+ */
+@Beta
+public abstract class LocalHistoryRequest<T extends LocalHistoryRequest<T>> extends Request<LocalHistoryIdentifier, T> {
+ private static final long serialVersionUID = 1L;
+
+ LocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+ super(target, replyTo);
+ }
+
+ LocalHistoryRequest(final T request, final ABIVersion version) {
+ super(request, version);
+ }
+
+ @Override
+ public final LocalHistoryFailure toRequestFailure(final RequestException cause) {
+ return new LocalHistoryFailure(getTarget(), cause);
+ }
+
+ @Override
+ protected abstract AbstractLocalHistoryRequestProxy<T> externalizableProxy(final ABIVersion version);
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.AbstractSuccessProxy;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
+
+/**
+ * Success class for {@link RequestSuccess}es involving a specific local history.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class LocalHistorySuccess extends RequestSuccess<LocalHistoryIdentifier, LocalHistorySuccess> {
+ private static final long serialVersionUID = 1L;
+
+ public LocalHistorySuccess(final LocalHistoryIdentifier target) {
+ super(target);
+ }
+
+ private LocalHistorySuccess(final LocalHistorySuccess success, final ABIVersion version) {
+ super(success, version);
+ }
+
+ @Override
+ protected LocalHistorySuccess cloneAsVersion(final ABIVersion version) {
+ return new LocalHistorySuccess(this, version);
+ }
+
+ @Override
+ protected AbstractSuccessProxy<LocalHistoryIdentifier, LocalHistorySuccess> externalizableProxy(
+ final ABIVersion version) {
+ return new LocalHistorySuccessProxyV1(this);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.AbstractSuccessProxy;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Serialization proxy associated with {@link LocalHistorySuccess}.
+ *
+ * @author Robert Varga
+ */
+final class LocalHistorySuccessProxyV1 extends AbstractSuccessProxy<LocalHistoryIdentifier, LocalHistorySuccess> {
+ private static final long serialVersionUID = 1L;
+
+ LocalHistorySuccessProxyV1() {
+ // For Externalizable
+ }
+
+ LocalHistorySuccessProxyV1(final LocalHistorySuccess success) {
+ super(success);
+ }
+
+ @Override
+ protected final LocalHistoryIdentifier readTarget(final DataInput in) throws IOException {
+ return LocalHistoryIdentifier.readFrom(in);
+ }
+
+ @Override
+ protected LocalHistorySuccess createSuccess(final LocalHistoryIdentifier target) {
+ return new LocalHistorySuccess(target);
+ }
+}
this.protocol = request.getPersistenceProtocol();
}
-
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Request to purge a local history. This request is sent by the client once it receives a successful reply to
+ * {@link DestroyLocalHistoryRequest} and indicates it has removed all state attached to a particular local history.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class PurgeLocalHistoryRequest extends LocalHistoryRequest<PurgeLocalHistoryRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public PurgeLocalHistoryRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+ super(target, replyTo);
+ }
+
+ private PurgeLocalHistoryRequest(final PurgeLocalHistoryRequest request, final ABIVersion version) {
+ super(request, version);
+ }
+
+ @Override
+ protected AbstractLocalHistoryRequestProxy<PurgeLocalHistoryRequest> externalizableProxy(final ABIVersion version) {
+ return new PurgeLocalHistoryRequestProxyV1(this);
+ }
+
+ @Override
+ protected PurgeLocalHistoryRequest cloneAsVersion(final ABIVersion version) {
+ return new PurgeLocalHistoryRequest(this, version);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link PurgeLocalHistoryRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class PurgeLocalHistoryRequestProxyV1 extends AbstractLocalHistoryRequestProxy<PurgeLocalHistoryRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public PurgeLocalHistoryRequestProxyV1() {
+ // For Externalizable
+ }
+
+ PurgeLocalHistoryRequestProxyV1(final PurgeLocalHistoryRequest request) {
+ super(request);
+ }
+
+ @Override
+ protected PurgeLocalHistoryRequest createRequest(final LocalHistoryIdentifier target, final ActorRef replyTo) {
+ return new PurgeLocalHistoryRequest(target, replyTo);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A transaction request to perform the abort step of the three-phase commit protocol.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class TransactionAbortRequest extends TransactionRequest<TransactionAbortRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionAbortRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+ super(target, replyTo);
+ }
+
+ @Override
+ protected TransactionAbortRequestProxyV1 externalizableProxy(final ABIVersion version) {
+ return new TransactionAbortRequestProxyV1(this);
+ }
+
+ @Override
+ protected TransactionAbortRequest cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionAbortRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionAbortRequestProxyV1 extends AbstractTransactionRequestProxy<TransactionAbortRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionAbortRequestProxyV1() {
+ // For Externalizable
+ }
+
+ TransactionAbortRequestProxyV1(final TransactionAbortRequest request) {
+ super(request);
+ }
+
+ @Override
+ protected TransactionAbortRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+ return new TransactionAbortRequest(target, replyTo);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to a coordinated commit request initiated by a {@link ModifyTransactionRequest}
+ * or {@link CommitLocalTransactionRequest}.
+ *
+ * @author Robert Varga
+ */
+public final class TransactionAbortSuccess extends TransactionSuccess<TransactionAbortSuccess> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionAbortSuccess(final TransactionIdentifier identifier) {
+ super(identifier);
+ }
+
+ @Override
+ protected AbstractTransactionSuccessProxy<TransactionAbortSuccess> externalizableProxy(final ABIVersion version) {
+ return new TransactionAbortSuccessProxyV1(this);
+ }
+
+ @Override
+ protected TransactionAbortSuccess cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionAbortSuccess}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionAbortSuccessProxyV1 extends AbstractTransactionSuccessProxy<TransactionAbortSuccess> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionAbortSuccessProxyV1() {
+ // For Externalizable
+ }
+
+ TransactionAbortSuccessProxyV1(final TransactionAbortSuccess success) {
+ super(success);
+ }
+
+ @Override
+ protected TransactionAbortSuccess createSuccess(final TransactionIdentifier target) {
+ return new TransactionAbortSuccess(target);
+ }
+}
*/
package org.opendaylight.controller.cluster.access.commands;
-import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
/**
- * Successful reply to a coordinated commit request. It contains a reference to the actor which is handling the commit
- * process.
+ * Successful reply to a coordinated commit request initiated by a {@link ModifyTransactionRequest}
+ * or {@link CommitLocalTransactionRequest}.
*
* @author Robert Varga
*/
public final class TransactionCanCommitSuccess extends TransactionSuccess<TransactionCanCommitSuccess> {
private static final long serialVersionUID = 1L;
- private final ActorRef cohort;
- public TransactionCanCommitSuccess(final TransactionIdentifier identifier, final ActorRef cohort) {
+ public TransactionCanCommitSuccess(final TransactionIdentifier identifier) {
super(identifier);
- this.cohort = Preconditions.checkNotNull(cohort);
- }
-
- public ActorRef getCohort() {
- return cohort;
}
@Override
*/
package org.opendaylight.controller.cluster.access.commands;
-import akka.actor.ActorRef;
-import akka.serialization.JavaSerializer;
-import akka.serialization.Serialization;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
*/
final class TransactionCanCommitSuccessProxyV1 extends AbstractTransactionSuccessProxy<TransactionCanCommitSuccess> {
private static final long serialVersionUID = 1L;
- private ActorRef cohort;
public TransactionCanCommitSuccessProxyV1() {
// For Externalizable
TransactionCanCommitSuccessProxyV1(final TransactionCanCommitSuccess success) {
super(success);
- this.cohort = success.getCohort();
}
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
super.writeExternal(out);
- out.writeUTF(Serialization.serializedActorPath(cohort));
}
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
- cohort = JavaSerializer.currentSystem().value().provider().resolveActorRef(in.readUTF());
}
@Override
protected TransactionCanCommitSuccess createSuccess(final TransactionIdentifier target) {
- return new TransactionCanCommitSuccess(target, cohort);
+ return new TransactionCanCommitSuccess(target);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to a coordinated commit request. It contains a reference to the actor which is handling the commit
+ * process.
+ *
+ * @author Robert Varga
+ */
+public final class TransactionCommitSuccess extends TransactionSuccess<TransactionCommitSuccess> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionCommitSuccess(final TransactionIdentifier identifier) {
+ super(identifier);
+ }
+
+ @Override
+ protected AbstractTransactionSuccessProxy<TransactionCommitSuccess> externalizableProxy(final ABIVersion version) {
+ return new TransactionCommitSuccessProxyV1(this);
+ }
+
+ @Override
+ protected TransactionCommitSuccess cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionCommitSuccess}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionCommitSuccessProxyV1 extends AbstractTransactionSuccessProxy<TransactionCommitSuccess> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionCommitSuccessProxyV1() {
+ // For Externalizable
+ }
+
+ TransactionCommitSuccessProxyV1(final TransactionCommitSuccess success) {
+ super(success);
+ }
+
+ @Override
+ protected TransactionCommitSuccess createSuccess(final TransactionIdentifier target) {
+ return new TransactionCommitSuccess(target);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A transaction request to perform the final, doCommit, step of the three-phase commit protocol.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class TransactionDoCommitRequest extends TransactionRequest<TransactionDoCommitRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionDoCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+ super(target, replyTo);
+ }
+
+ @Override
+ protected TransactionDoCommitRequestProxyV1 externalizableProxy(final ABIVersion version) {
+ return new TransactionDoCommitRequestProxyV1(this);
+ }
+
+ @Override
+ protected TransactionDoCommitRequest cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionDoCommitRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionDoCommitRequestProxyV1 extends AbstractTransactionRequestProxy<TransactionDoCommitRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionDoCommitRequestProxyV1() {
+ // For Externalizable
+ }
+
+ TransactionDoCommitRequestProxyV1(final TransactionDoCommitRequest request) {
+ super(request);
+ }
+
+ @Override
+ protected TransactionDoCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+ return new TransactionDoCommitRequest(target, replyTo);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A transaction request to perform the second, preCommit, step of the three-phase commit protocol.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class TransactionPreCommitRequest extends TransactionRequest<TransactionPreCommitRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionPreCommitRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+ super(target, replyTo);
+ }
+
+ @Override
+ protected TransactionPreCommitRequestProxyV1 externalizableProxy(final ABIVersion version) {
+ return new TransactionPreCommitRequestProxyV1(this);
+ }
+
+ @Override
+ protected TransactionPreCommitRequest cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionPreCommitRequest}. It implements the initial (Boron) serialization
+ * format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionPreCommitRequestProxyV1 extends AbstractTransactionRequestProxy<TransactionPreCommitRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionPreCommitRequestProxyV1() {
+ // For Externalizable
+ }
+
+ TransactionPreCommitRequestProxyV1(final TransactionPreCommitRequest request) {
+ super(request);
+ }
+
+ @Override
+ protected TransactionPreCommitRequest createRequest(final TransactionIdentifier target, final ActorRef replyTo) {
+ return new TransactionPreCommitRequest(target, replyTo);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to a {@link TransactionPreCommitRequest}.
+ *
+ * @author Robert Varga
+ */
+public final class TransactionPreCommitSuccess extends TransactionSuccess<TransactionPreCommitSuccess> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionPreCommitSuccess(final TransactionIdentifier identifier) {
+ super(identifier);
+ }
+
+ @Override
+ protected AbstractTransactionSuccessProxy<TransactionPreCommitSuccess> externalizableProxy(final ABIVersion version) {
+ return new TransactionPreCommitSuccessProxyV1(this);
+ }
+
+ @Override
+ protected TransactionPreCommitSuccess cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionPreCommitSuccess}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionPreCommitSuccessProxyV1 extends AbstractTransactionSuccessProxy<TransactionPreCommitSuccess> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionPreCommitSuccessProxyV1() {
+ // For Externalizable
+ }
+
+ TransactionPreCommitSuccessProxyV1(final TransactionPreCommitSuccess success) {
+ super(success);
+ }
+
+ @Override
+ protected TransactionPreCommitSuccess createSuccess(final TransactionIdentifier target) {
+ return new TransactionPreCommitSuccess(target);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+
+/**
+ * Abstract base class for concrete {@link DOMStoreTransaction} implementations. It holds a reference to the associated
+ * {@link ClientTransaction}. This abstraction layer is needed to isolate end users, who interact with
+ * {@link DOMStoreTransaction} from the internal implementation.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractShardedTransaction implements DOMStoreTransaction {
+ private final ClientTransaction tx;
+
+ AbstractShardedTransaction(final ClientTransaction tx) {
+ this.tx = Preconditions.checkNotNull(tx);
+ }
+
+ @Override
+ public final Object getIdentifier() {
+ return tx.getIdentifier();
+ }
+
+ final ClientTransaction transaction() {
+ return tx;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Proxy implementation of {@link DOMStoreReadTransaction}. It routes all requests to the backing
+ * {@link ClientTransaction}. This class is not final to allow further subclassing by
+ * {@link ShardedDOMStoreReadWriteTransaction}.
+ *
+ * @author Robert Varga
+ */
+class ShardedDOMStoreReadTransaction extends AbstractShardedTransaction implements DOMStoreReadTransaction {
+ ShardedDOMStoreReadTransaction(final ClientTransaction tx) {
+ super(tx);
+ }
+
+ @Override
+ public final void close() {
+ transaction().abort();
+ }
+
+ @Override
+ public final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+ return transaction().read(path);
+ }
+
+ @Override
+ public final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+ return transaction().exists(path);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Proxy implementation of {@link DOMStoreReadWriteTransaction}. It routes all requests to the backing
+ * {@link ClientTransaction}.
+ *
+ * @author Robert Varga
+ */
+final class ShardedDOMStoreReadWriteTransaction extends ShardedDOMStoreReadTransaction implements DOMStoreReadWriteTransaction {
+
+ ShardedDOMStoreReadWriteTransaction(final ClientTransaction tx) {
+ super(tx);
+ }
+
+ @Override
+ public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ transaction().write(path, data);
+ }
+
+ @Override
+ public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ transaction().merge(path, data);
+ }
+
+ @Override
+ public void delete(final YangInstanceIdentifier path) {
+ transaction().delete(path);
+ }
+
+ @Override
+ public DOMStoreThreePhaseCommitCohort ready() {
+ return transaction().ready();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+
+/**
+ * Implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}. It wraps
+ * {@link ClientTransaction} into proxies like {@link ShardedDOMStoreReadTransaction} to provide isolation.
+ *
+ * @author Robert Varga
+ */
+final class ShardedDOMStoreTransactionChain implements DOMStoreTransactionChain {
+ private final ClientLocalHistory history;
+
+ ShardedDOMStoreTransactionChain(final ClientLocalHistory history) {
+ this.history = Preconditions.checkNotNull(history);
+ }
+
+ @Override
+ public DOMStoreReadTransaction newReadOnlyTransaction() {
+ return new ShardedDOMStoreReadTransaction(history.createTransaction());
+ }
+
+ @Override
+ public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ return new ShardedDOMStoreReadWriteTransaction(history.createTransaction());
+ }
+
+ @Override
+ public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ return new ShardedDOMStoreWriteTransaction(history.createTransaction());
+ }
+
+ @Override
+ public void close() {
+ history.close();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Proxy implementation of {@link DOMStoreWriteTransaction}. It routes all requests to the backing
+ * {@link ClientTransaction}.
+ *
+ * @author Robert Varga
+ */
+final class ShardedDOMStoreWriteTransaction extends AbstractShardedTransaction implements DOMStoreWriteTransaction {
+ ShardedDOMStoreWriteTransaction(final ClientTransaction tx) {
+ super(tx);
+ }
+
+ @Override
+ public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ transaction().write(path, data);
+ }
+
+ @Override
+ public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ transaction().merge(path, data);
+ }
+
+ @Override
+ public void delete(final YangInstanceIdentifier path) {
+ transaction().delete(path);
+ }
+
+ @Override
+ public DOMStoreThreePhaseCommitCohort ready() {
+ return transaction().ready();
+ }
+
+ @Override
+ public final void close() {
+ transaction().abort();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
+ * and the other for single transactions.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
+ static enum State {
+ IDLE,
+ TX_OPEN,
+ CLOSED,
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
+ private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
+
+ private final Map<Long, LocalHistoryIdentifier> histories = new ConcurrentHashMap<>();
+ private final DistributedDataStoreClientBehavior client;
+ private final LocalHistoryIdentifier identifier;
+
+ private volatile State state = State.IDLE;
+
+ AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
+ this.client = Preconditions.checkNotNull(client);
+ this.identifier = Preconditions.checkNotNull(identifier);
+ Preconditions.checkArgument(identifier.getCookie() == 0);
+ }
+
+ final State state() {
+ return state;
+ }
+
+ final void updateState(final State expected, final State next) {
+ final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
+ Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
+ }
+
+ final LocalHistoryIdentifier getHistoryForCookie(final Long cookie) {
+ LocalHistoryIdentifier ret = histories.get(cookie);
+ if (ret == null) {
+ ret = new LocalHistoryIdentifier(identifier.getClientId(), identifier.getHistoryId(), cookie);
+ final LocalHistoryIdentifier existing = histories.putIfAbsent(cookie, ret);
+ if (existing != null) {
+ ret = existing;
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public final LocalHistoryIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ final DistributedDataStoreClientBehavior getClient() {
+ return client;
+ }
+
+ @Override
+ final void localAbort(final Throwable cause) {
+ LOG.debug("Force-closing history {}", getIdentifier(), cause);
+ state = State.CLOSED;
+ }
+
+ /**
+ * Callback invoked from {@link ClientTransaction} when a transaction has been sub
+ *
+ * @param transaction Transaction handle
+ */
+ void onTransactionReady(final ClientTransaction transaction) {
+ client.transactionComplete(transaction);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Class translating transaction operations towards a particular backend shard.
+ *
+ * This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
+ * transitions coming from interactions with backend are expected to be thread-safe.
+ *
+ * This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
+ * to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
+ private final DistributedDataStoreClientBehavior client;
+
+ private long sequence;
+ private boolean sealed;
+
+ AbstractProxyTransaction(final DistributedDataStoreClientBehavior client) {
+ this.client = Preconditions.checkNotNull(client);
+ }
+
+ /**
+ * Instantiate a new tracker for a transaction. This method bases its decision on which implementation to use
+ * based on provided {@link ShardBackendInfo}. If no information is present, it will choose the remote
+ * implementation, which is fine, as the queueing logic in ClientActorBehavior will hold on to the requests until
+ * the backend is located.
+ *
+ * @param client Client behavior
+ * @param historyId Local history identifier
+ * @param transactionId Transaction identifier
+ * @param backend Optional backend identifier
+ * @return A new state tracker
+ */
+ static AbstractProxyTransaction create(final DistributedDataStoreClientBehavior client,
+ final LocalHistoryIdentifier historyId, final long transactionId,
+ final java.util.Optional<ShardBackendInfo> backend) {
+
+ final java.util.Optional<DataTree> dataTree = backend.flatMap(t -> t.getDataTree());
+ final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId);
+ if (dataTree.isPresent()) {
+ return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot());
+ } else {
+ return new RemoteProxyTransaction(client, identifier);
+ }
+ }
+
+ final DistributedDataStoreClientBehavior client() {
+ return client;
+ }
+
+ final long nextSequence() {
+ return sequence++;
+ }
+
+ final void delete(final YangInstanceIdentifier path) {
+ checkSealed();
+ doDelete(path);
+ }
+
+ final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkSealed();
+ doMerge(path, data);
+ }
+
+ final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkSealed();
+ doWrite(path, data);
+ }
+
+ final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+ checkSealed();
+ return doExists(path);
+ }
+
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+ checkSealed();
+ return doRead(path);
+ }
+
+ /**
+ * Seal this transaction before it is either
+ */
+ final void seal() {
+ checkSealed();
+ doSeal();
+ sealed = true;
+ }
+
+ private void checkSealed() {
+ Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
+ }
+
+ /**
+ * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
+ * being sent to the backend.
+ */
+ final void abort() {
+ checkSealed();
+ doAbort();
+ }
+
+ /**
+ * Commit this transaction, possibly in a coordinated fashion.
+ *
+ * @param coordinated True if this transaction should be coordinated across multiple participants.
+ * @return Future completion
+ */
+ final ListenableFuture<Boolean> directCommit() {
+ checkSealed();
+
+ final SettableFuture<Boolean> ret = SettableFuture.create();
+ client().sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(false)), t -> {
+ if (t instanceof TransactionCommitSuccess) {
+ ret.set(Boolean.TRUE);
+ } else if (t instanceof RequestFailure) {
+ ret.setException(((RequestFailure<?, ?>) t).getCause());
+ } else {
+ ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
+ }
+ });
+ return ret;
+ }
+
+ void abort(final VotingFuture<Void> ret) {
+ checkSealed();
+
+ client.sendRequest(nextSequence(), new TransactionAbortRequest(getIdentifier(), client().self()), t -> {
+ if (t instanceof TransactionAbortSuccess) {
+ ret.voteYes();
+ } else if (t instanceof RequestFailure) {
+ ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+ } else {
+ ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+ }
+ });
+ }
+
+ void canCommit(final VotingFuture<?> ret) {
+ checkSealed();
+
+ client.sendRequest(nextSequence(), Verify.verifyNotNull(doCommit(true)), t -> {
+ if (t instanceof TransactionCanCommitSuccess) {
+ ret.voteYes();
+ } else if (t instanceof RequestFailure) {
+ ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+ } else {
+ ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+ }
+ });
+ }
+
+ void preCommit(final VotingFuture<?> ret) {
+ checkSealed();
+
+ client.sendRequest(nextSequence(), new TransactionPreCommitRequest(getIdentifier(), client().self()), t-> {
+ if (t instanceof TransactionPreCommitSuccess) {
+ ret.voteYes();
+ } else if (t instanceof RequestFailure) {
+ ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+ } else {
+ ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+ }
+ });
+ }
+
+ void doCommit(final VotingFuture<?> ret) {
+ checkSealed();
+
+ client.sendRequest(nextSequence(), new TransactionDoCommitRequest(getIdentifier(), client().self()), t-> {
+ if (t instanceof TransactionCommitSuccess) {
+ ret.voteYes();
+ } else if (t instanceof RequestFailure) {
+ ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+ } else {
+ ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+ }
+ });
+ }
+
+ abstract void doDelete(final YangInstanceIdentifier path);
+
+ abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+
+ abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+
+ abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
+
+ abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path);
+
+ abstract void doSeal();
+
+ abstract void doAbort();
+
+ abstract TransactionRequest<?> doCommit(boolean coordinated);
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+/**
+ * Base class for internal {@link DOMStoreThreePhaseCommitCohort} implementation. It contains utility constants for
+ * wide reuse.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractTransactionCommitCohort implements DOMStoreThreePhaseCommitCohort {
+ static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+ static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+
+}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import akka.actor.ActorRef;
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
/**
* Client-side view of a local history. This class tracks all state related to a particular history and routes
* @author Robert Varga
*/
@Beta
-public final class ClientLocalHistory implements AutoCloseable {
- private static final AtomicIntegerFieldUpdater<ClientLocalHistory> STATE_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(ClientLocalHistory.class, "state");
- private static final int IDLE_STATE = 0;
- private static final int CLOSED_STATE = 1;
+public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
- private final ClientIdentifier clientId;
- private final long historyId;
- private final ActorRef backendActor;
- private final ActorRef clientActor;
+ private static final AtomicLongFieldUpdater<ClientLocalHistory> NEXT_TX_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(ClientLocalHistory.class, "nextTx");
- private volatile int state = IDLE_STATE;
+ // Used via NEXT_TX_UPDATER
+ @SuppressWarnings("unused")
+ private volatile long nextTx = 0;
- ClientLocalHistory(final DistributedDataStoreClientBehavior client, final long historyId,
- final ActorRef backendActor) {
- this.clientActor = client.self();
- this.backendActor = Preconditions.checkNotNull(backendActor);
- this.clientId = Verify.verifyNotNull(client.getIdentifier());
- this.historyId = historyId;
+ ClientLocalHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
+ super(client, historyId);
}
- private void checkNotClosed() {
- if (state == CLOSED_STATE) {
- throw new IllegalStateException("Local history " + new LocalHistoryIdentifier(clientId, historyId) + " is closed");
- }
+ public ClientTransaction createTransaction() {
+ final State local = state();
+ Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
+ updateState(local, State.TX_OPEN);
+
+ return new ClientTransaction(getClient(), this,
+ new TransactionIdentifier(getIdentifier(), NEXT_TX_UPDATER.getAndIncrement(this)));
}
@Override
public void close() {
- if (STATE_UPDATER.compareAndSet(this, IDLE_STATE, CLOSED_STATE)) {
- // FIXME: signal close to both client actor and backend actor
- } else if (state != CLOSED_STATE) {
- throw new IllegalStateException("Cannot close history with an open transaction");
+ final State local = state();
+ if (local != State.CLOSED) {
+ Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this);
+ updateState(local, State.CLOSED);
}
}
- // FIXME: add client requests related to a particular local history
+ @Override
+ void onTransactionReady(final ClientTransaction transaction) {
+ final State local = state();
+ Verify.verify(local == State.TX_OPEN, "Local history %s is in unexpected state %s", this, local);
+ updateState(local, State.IDLE);
+ super.onTransactionReady(transaction);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client-side view of a free-standing transaction.
+ *
+ * This interface is used by the world outside of the actor system and in the actor system it is manifested via
+ * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to
+ * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor.
+ *
+ * It is internally composed of multiple {@link RemoteProxyTransaction}s, each responsible for a component shard.
+ *
+ * Implementation is quite a bit complex, and involves cooperation with {@link AbstractClientHistory} for tracking
+ * gaps in transaction identifiers seen by backends.
+ *
+ * These gaps need to be accounted for in the transaction setup message sent to a particular backend, so it can verify
+ * that the requested transaction is in-sequence. This is critical in ensuring that transactions (which are independent
+ * entities from message queueing perspective) do not get reodered -- thus allowing multiple in-flight transactions.
+ *
+ * Alternative would be to force visibility by sending an abort request to all potential backends, but that would mean
+ * that even empty transactions increase load on all shards -- which would be a scalability issue.
+ *
+ * Yet another alternative would be to introduce inter-transaction dependencies to the queueing layer in client actor,
+ * but that would require additional indirection and complexity.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class ClientTransaction extends LocalAbortable implements Identifiable<TransactionIdentifier> {
+ private static final Logger LOG = LoggerFactory.getLogger(ClientTransaction.class);
+ private static final AtomicIntegerFieldUpdater<ClientTransaction> STATE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ClientTransaction.class, "state");
+ private static final int OPEN_STATE = 0;
+ private static final int CLOSED_STATE = 1;
+
+ private final Map<Long, AbstractProxyTransaction> proxies = new HashMap<>();
+ private final TransactionIdentifier transactionId;
+ private final AbstractClientHistory parent;
+
+ private volatile int state = OPEN_STATE;
+
+ ClientTransaction(final DistributedDataStoreClientBehavior client, final AbstractClientHistory parent,
+ final TransactionIdentifier transactionId) {
+ this.transactionId = Preconditions.checkNotNull(transactionId);
+ this.parent = Preconditions.checkNotNull(parent);
+ }
+
+ private void checkNotClosed() {
+ Preconditions.checkState(state == OPEN_STATE, "Transaction %s is closed", transactionId);
+ }
+
+ private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) {
+ checkNotClosed();
+
+ final ModuleShardBackendResolver resolver = parent.getClient().resolver();
+ final Long shard = resolver.resolveShardForPath(path);
+ AbstractProxyTransaction ret = proxies.get(shard);
+ if (ret == null) {
+ ret = AbstractProxyTransaction.create(parent.getClient(), parent.getHistoryForCookie(shard),
+ transactionId.getTransactionId(), resolver.getFutureBackendInfo(shard));
+ proxies.put(shard, ret);
+ }
+ return ret;
+ }
+
+ @Override
+ public TransactionIdentifier getIdentifier() {
+ return transactionId;
+ }
+
+ public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+ return ensureProxy(path).exists(path);
+ }
+
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+ return ensureProxy(path).read(path);
+ }
+
+ public void delete(final YangInstanceIdentifier path) {
+ ensureProxy(path).delete(path);
+ }
+
+ public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ ensureProxy(path).merge(path, data);
+ }
+
+ public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ ensureProxy(path).write(path, data);
+ }
+
+ private boolean ensureClosed() {
+ final int local = state;
+ if (local != CLOSED_STATE) {
+ final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE);
+ Preconditions.checkState(success, "Transaction %s raced during close", this);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public DOMStoreThreePhaseCommitCohort ready() {
+ Preconditions.checkState(ensureClosed(), "Attempted to submit a closed transaction %s", this);
+
+ for (AbstractProxyTransaction p : proxies.values()) {
+ p.seal();
+ }
+ parent.onTransactionReady(this);
+
+ switch (proxies.size()) {
+ case 0:
+ return EmptyTransactionCommitCohort.INSTANCE;
+ case 1:
+ return new DirectTransactionCommitCohort(Iterables.getOnlyElement(proxies.values()));
+ default:
+ return new ClientTransactionCommitCohort(proxies.values());
+ }
+ }
+
+ /**
+ * Release all state associated with this transaction.
+ */
+ public void abort() {
+ if (ensureClosed()) {
+ for (AbstractProxyTransaction proxy : proxies.values()) {
+ proxy.abort();
+ }
+ proxies.clear();
+ }
+ }
+
+ @Override
+ void localAbort(final Throwable cause) {
+ LOG.debug("Aborting transaction {}", getIdentifier(), cause);
+ abort();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collection;
+import java.util.List;
+
+final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohort {
+ private final List<AbstractProxyTransaction> proxies;
+
+ /**
+ * @param clientTransaction
+ */
+ ClientTransactionCommitCohort(final Collection<AbstractProxyTransaction> proxies) {
+ this.proxies = ImmutableList.copyOf(proxies);
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ /*
+ * Issue the request to commit for all participants. We will track the results and report them.
+ */
+ final VotingFuture<Boolean> ret = new VotingFuture<>(Boolean.TRUE, proxies.size());
+ for (AbstractProxyTransaction proxy : proxies) {
+ proxy.canCommit(ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ final VotingFuture<Void> ret = new VotingFuture<>(null, proxies.size());
+ for (AbstractProxyTransaction proxy : proxies) {
+ proxy.preCommit(ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ final VotingFuture<Void> ret = new VotingFuture<>(null, proxies.size());
+ for (AbstractProxyTransaction proxy : proxies) {
+ proxy.doCommit(ret);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ final VotingFuture<Void> ret = new VotingFuture<>(null, proxies.size());
+ for (AbstractProxyTransaction proxy : proxies) {
+ proxy.abort(ret);
+ }
+
+ return ret;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * An {@link AbstractTransactionCommitCohort} implementation for transactions which contain a single proxy. Since there
+ * is only one proxy,
+ *
+ * @author Robert Varga
+ */
+final class DirectTransactionCommitCohort extends AbstractTransactionCommitCohort {
+ private final AbstractProxyTransaction proxy;
+
+ /**
+ * @param clientTransaction
+ */
+ DirectTransactionCommitCohort(final AbstractProxyTransaction proxy) {
+ this.proxy = Preconditions.checkNotNull(proxy);
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return proxy.directCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return VOID_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return VOID_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ return VOID_FUTURE;
+ }
+}
package org.opendaylight.controller.cluster.databroker.actors.dds;
import com.google.common.annotations.Beta;
-import java.util.concurrent.CompletionStage;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
@Beta
public interface DistributedDataStoreClient extends Identifiable<ClientIdentifier>, AutoCloseable {
@Override
- ClientIdentifier getIdentifier();
+ @Nonnull ClientIdentifier getIdentifier();
@Override
void close();
/**
- * Create a new local history. This method initiates an asynchronous instantiation of a local history on the back
- * end. ClientLocalHistory represents the interface exposed to the client.
+ * Create a new local history. ClientLocalHistory represents the interface exposed to the client.
*
- * @return Future client history handle
+ * @return Client history handle
*/
- CompletionStage<ClientLocalHistory> createLocalHistory();
-
- // TODO: add methods required by DistributedDataStore
+ @Nonnull ClientLocalHistory createLocalHistory();
+ /**
+ * Create a new free-standing transaction.
+ *
+ * @return Client transaction handle
+ */
+ @Nonnull ClientTransaction createTransaction();
}
import akka.actor.ActorRef;
import akka.actor.Status;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
+import com.google.common.base.Throwables;
+import com.google.common.base.Verify;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
+ private final Map<TransactionIdentifier, ClientTransaction> transactions = new ConcurrentHashMap<>();
+ private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
+ private final AtomicLong nextHistoryId = new AtomicLong(1);
+ private final AtomicLong nextTransactionId = new AtomicLong();
private final ModuleShardBackendResolver resolver;
- private long nextHistoryId;
+ private final SingleClientHistory singleHistory;
+
+ private volatile Throwable aborted;
DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
super(context);
resolver = new ModuleShardBackendResolver(actorContext);
+ singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
}
//
@Override
protected void haltClient(final Throwable cause) {
- // FIXME: Add state flushing here once we have state
+ // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up
+ // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
+ // thread.
+ if (aborted != null) {
+ abortOperations(cause);
+ }
}
- private ClientActorBehavior createLocalHistory(final ClientActorBehavior currentBehavior,
- final CompletableFuture<ClientLocalHistory> future) {
- final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
- LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
+ private void abortOperations(final Throwable cause) {
+ // This acts as a barrier, application threads check this after they have added an entry in the maps,
+ // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
+ aborted = cause;
- // FIXME: initiate backend instantiation
- future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
- return currentBehavior;
+ for (ClientLocalHistory h : histories.values()) {
+ h.localAbort(cause);
+ }
+ histories.clear();
+
+ for (ClientTransaction t : transactions.values()) {
+ t.localAbort(cause);
+ }
+ transactions.clear();
}
- private ClientActorBehavior shutdown(final ClientActorBehavior currentBehavior) {
- // FIXME: Add shutdown procedures here
+ private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
+ abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
return null;
}
@Override
- protected ClientActorBehavior onCommand(final Object command) {
+ protected DistributedDataStoreClientBehavior onCommand(final Object command) {
if (command instanceof GetClientRequest) {
((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
} else {
//
//
+ private static <K, V extends LocalAbortable> V returnIfOperational(final Map<K , V> map, final K key, final V value,
+ final Throwable aborted) {
+ Verify.verify(map.put(key, value) == null);
+
+ if (aborted != null) {
+ try {
+ value.localAbort(aborted);
+ } catch (Exception e) {
+ LOG.debug("Close of {} failed", value, e);
+ }
+ map.remove(key, value);
+ throw Throwables.propagate(aborted);
+ }
+
+ return value;
+ }
+
@Override
- public CompletionStage<ClientLocalHistory> createLocalHistory() {
- final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
- context().executeInActor(currentBehavior -> createLocalHistory(currentBehavior, future));
- return future;
+ public ClientLocalHistory createLocalHistory() {
+ final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
+ nextHistoryId.getAndIncrement());
+ final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
+ LOG.debug("{}: creating a new local history {}", persistenceId(), history);
+
+ return returnIfOperational(histories, historyId, history, aborted);
+ }
+
+ @Override
+ public ClientTransaction createTransaction() {
+ final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
+ nextTransactionId.getAndIncrement());
+ final ClientTransaction tx = new ClientTransaction(this, singleHistory, txId);
+ LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
+
+ return returnIfOperational(transactions, txId, tx, aborted);
}
@Override
protected ModuleShardBackendResolver resolver() {
return resolver;
}
+
+ void transactionComplete(final ClientTransaction transaction) {
+ transactions.remove(transaction.getIdentifier());
+ }
+
+ void sendRequest(final long sequence, final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
+ sendRequest(sequence, request, response -> {
+ completer.accept(response);
+ return this;
+ });
+ }
+
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+/**
+ * An {@link AbstractTransactionCommitCohort} for use with empty transactions. This relies on the fact that no backends
+ * have been touched, hence all state book-keeping needs to happen only locally and shares fate with the coordinator.
+ *
+ * Therefore all methods can finish immediately without any effects.
+ *
+ * @author Robert Varga
+ */
+final class EmptyTransactionCommitCohort extends AbstractTransactionCommitCohort {
+ static final DOMStoreThreePhaseCommitCohort INSTANCE = new EmptyTransactionCommitCohort();
+
+ private EmptyTransactionCommitCohort() {
+ // Hidden
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return TRUE_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return VOID_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return VOID_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ return VOID_FUTURE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.function.Supplier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+
+/**
+ * An implementation of DataTreeModification which throws a specified {@link RuntimeException} when any of its methods
+ * are invoked.
+ *
+ * @author Robert Varga
+ */
+final class FailedDataTreeModification implements DataTreeModification {
+ private final Supplier<? extends RuntimeException> supplier;
+
+ FailedDataTreeModification(final Supplier<? extends RuntimeException> supplier) {
+ this.supplier = Preconditions.checkNotNull(supplier);
+ }
+
+ @Override
+ public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
+ throw supplier.get();
+ }
+
+ @Override
+ public DataTreeModification newModification() {
+ throw supplier.get();
+ }
+
+ @Override
+ public void delete(final YangInstanceIdentifier path) {
+ throw supplier.get();
+ }
+
+ @Override
+ public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ throw supplier.get();
+ }
+
+ @Override
+ public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ throw supplier.get();
+ }
+
+ @Override
+ public void ready() {
+ throw supplier.get();
+ }
+
+ @Override
+ public void applyToCursor(final DataTreeModificationCursor cursor) {
+ throw supplier.get();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+/**
+ * Common interface for client histories and client transactions, which can be aborted immediately without replicating
+ * the effect to the backend. This is needed for abrupt shutdowns.
+ *
+ * Since classes which need to expose this functionality do not need a base class, this is an abstract class and not
+ * an interface -- which allows us to not leak the {@link #localAbort(Throwable)} method.
+ *
+ * @author Robert Varga
+ */
+abstract class LocalAbortable {
+ /**
+ * Immediately abort this object.
+ *
+ * @param cause Failure which caused this abort.
+ */
+ abstract void localAbort(Throwable cause);
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with
+ * the client instance.
+ *
+ * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations
+ * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard
+ * leader.
+ *
+ * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the
+ * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class LocalProxyTransaction extends AbstractProxyTransaction {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
+ private static final Consumer<Response<?, ?>> ABORT_COMPLETER = response -> {
+ LOG.debug("Abort completed with {}", response);
+ };
+
+ private final TransactionIdentifier identifier;
+ private DataTreeModification modification;
+
+ LocalProxyTransaction(final DistributedDataStoreClientBehavior client,
+ final TransactionIdentifier identifier, final DataTreeSnapshot snapshot) {
+ super(client);
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.modification = snapshot.newModification();
+ }
+
+ @Override
+ public TransactionIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ void doDelete(final YangInstanceIdentifier path) {
+ modification.delete(path);
+ }
+
+ @Override
+ void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ modification.merge(path, data);
+ }
+
+ @Override
+ void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ modification.write(path, data);
+ }
+
+ @Override
+ CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
+ return Futures.immediateCheckedFuture(modification.readNode(path).isPresent());
+ }
+
+ @Override
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
+ return Futures.immediateCheckedFuture(modification.readNode(path));
+ }
+
+ @Override
+ void doAbort() {
+ client().sendRequest(nextSequence(), new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER);
+ modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
+ }
+
+ @Override
+ CommitLocalTransactionRequest doCommit(final boolean coordinated) {
+ final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, client().self(),
+ modification, coordinated);
+ modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted"));
+ return ret;
+ }
+
+ @Override
+ void doSeal() {
+ modification.ready();
+ }
+}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.OnComplete;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableBiMap.Builder;
import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfo;
import org.opendaylight.controller.cluster.datastore.actors.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.compat.java8.FutureConverters;
+import scala.concurrent.ExecutionContext;
/**
* {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
* @author Robert Varga
*/
final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
+ private static final ExecutionContext DIRECT_EXECUTION_CONTEXT =
+ ExecutionContexts.fromExecutor(MoreExecutors.directExecutor());
+ private static final CompletableFuture<ShardBackendInfo> NULL_FUTURE = CompletableFuture.completedFuture(null);
private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
+
/**
* Fall-over-dead timeout. If we do not make progress in this long, just fall over and propagate the failure.
* All users are expected to fail, possibly attempting to recover by restarting. It is fair to remain
private final ActorContext actorContext;
- private volatile BiMap<String, Long> shards = ImmutableBiMap.of();
+ @GuardedBy("this")
+ private long nextShard = 1;
+
+ private volatile BiMap<String, Long> shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L);
// FIXME: we really need just ActorContext.findPrimaryShardAsync()
ModuleShardBackendResolver(final ActorContext actorContext) {
actorContext.getPrimaryShardInfoCache().remove(((ShardBackendInfo)result).getShardName());
}
+ Long resolveShardForPath(final YangInstanceIdentifier path) {
+ final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
+ Long cookie = shards.get(shardName);
+ if (cookie == null) {
+ synchronized (this) {
+ cookie = shards.get(shardName);
+ if (cookie == null) {
+ cookie = nextShard++;
+
+ Builder<String, Long> b = ImmutableBiMap.builder();
+ b.putAll(shards);
+ b.put(shardName, cookie);
+ shards = b.build();
+ }
+ }
+ }
+
+ return cookie;
+ }
+
@Override
- protected CompletionStage<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
+ protected CompletableFuture<ShardBackendInfo> resolveBackendInfo(final Long cookie) {
final String shardName = shards.inverse().get(cookie);
if (shardName == null) {
LOG.warn("Failing request for non-existent cookie {}", cookie);
- return CompletableFuture.completedFuture(null);
+ return NULL_FUTURE;
}
+ final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<>();
+
+ actorContext.findPrimaryShardAsync(shardName).onComplete(new OnComplete<PrimaryShardInfo>() {
+ @Override
+ public void onComplete(final Throwable t, final PrimaryShardInfo v) {
+ if (t != null) {
+ ret.completeExceptionally(t);
+ } else {
+ ret.complete(createBackendInfo(v, shardName, cookie));
+ }
+ }
+ }, DIRECT_EXECUTION_CONTEXT);
+
LOG.debug("Resolving cookie {} to shard {}", cookie, shardName);
- return FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName))
- .thenApply(o -> createBackendInfo(o, shardName, cookie));
+ return ret;
}
private static ABIVersion toABIVersion(final short version) {
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader whose location is currently
+ * not known or is known to be not co-located with the client.
+ *
+ * It packages operations and sends them via the client actor queue to the shard leader. That queue is responsible for
+ * maintaining any submitted operations until the leader is discovered.
+ *
+ * This class is not safe to access from multiple application threads, as is usual for transactions. Its internal state
+ * transitions based on backend responses are thread-safe.
+ *
+ * @author Robert Varga
+ */
+final class RemoteProxyTransaction extends AbstractProxyTransaction {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteProxyTransaction.class);
+
+ // FIXME: make this tuneable
+ private static final int REQUEST_MAX_MODIFICATIONS = 1000;
+
+ private final ModifyTransactionRequestBuilder builder;
+
+ private boolean builderBusy;
+
+ private volatile Exception operationFailure;
+
+ RemoteProxyTransaction(final DistributedDataStoreClientBehavior client,
+ final TransactionIdentifier identifier) {
+ super(client);
+ builder = new ModifyTransactionRequestBuilder(identifier, client.self());
+ }
+
+ @Override
+ public TransactionIdentifier getIdentifier() {
+ return builder.getIdentifier();
+ }
+
+ @Override
+ void doDelete(final YangInstanceIdentifier path) {
+ appendModification(new TransactionDelete(path));
+ }
+
+ @Override
+ void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ appendModification(new TransactionMerge(path, data));
+ }
+
+ @Override
+ void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ appendModification(new TransactionWrite(path, data));
+ }
+
+ private <T> CheckedFuture<T, ReadFailedException> sendReadRequest(final AbstractReadTransactionRequest<?> request,
+ final Consumer<Response<?, ?>> completer, final ListenableFuture<T> future) {
+ // Check if a previous operation failed. If it has, do not bother sending anything and report a failure
+ final Exception local = operationFailure;
+ if (local != null) {
+ return Futures.immediateFailedCheckedFuture(new ReadFailedException("Previous operation failed", local));
+ }
+
+ // Make sure we send any modifications before issuing a read
+ ensureFlushedBuider();
+ client().sendRequest(nextSequence(), request, completer);
+ return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
+ }
+
+ @Override
+ CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
+ final SettableFuture<Boolean> future = SettableFuture.create();
+ return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), client().self(), path),
+ t -> completeExists(future, t), future);
+ }
+
+ @Override
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
+ return sendReadRequest(new ReadTransactionRequest(getIdentifier(), client().self(), path),
+ t -> completeRead(future, t), future);
+ }
+
+ @Override
+ void doAbort() {
+ ensureInitializedBuider();
+ builder.setAbort();
+ flushBuilder();
+ }
+
+ private void ensureInitializedBuider() {
+ if (!builderBusy) {
+ builderBusy = true;
+ }
+ }
+
+ private void ensureFlushedBuider() {
+ if (builderBusy) {
+ flushBuilder();
+ }
+ }
+
+ private void flushBuilder() {
+ client().sendRequest(nextSequence(), builder.build(), this::completeModify);
+ builderBusy = false;
+ }
+
+ private void appendModification(final TransactionModification modification) {
+ if (operationFailure == null) {
+ ensureInitializedBuider();
+
+ builder.addModification(modification);
+ if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
+ flushBuilder();
+ }
+ } else {
+ LOG.debug("Transaction {} failed, not attempting further transactions", getIdentifier());
+ }
+ }
+
+ private void completeModify(final Response<?, ?> response) {
+ LOG.debug("Modification request completed with {}", response);
+
+ if (response instanceof TransactionSuccess) {
+ // Happy path no-op
+ } else {
+ recordFailedResponse(response);
+ }
+ }
+
+ private Exception recordFailedResponse(final Response<?, ?> response) {
+ final Exception failure;
+ if (response instanceof RequestFailure) {
+ failure = ((RequestFailure<?, ?>) response).getCause();
+ } else {
+ LOG.warn("Unhandled response {}", response);
+ failure = new IllegalArgumentException("Unhandled response " + response.getClass());
+ }
+
+ if (operationFailure == null) {
+ LOG.debug("Transaction {} failed", getIdentifier(), failure);
+ operationFailure = failure;
+ }
+ return failure;
+ }
+
+ private void failFuture(final SettableFuture<?> future, final Response<?, ?> response) {
+ future.setException(recordFailedResponse(response));
+ }
+
+ private void completeExists(final SettableFuture<Boolean> future, final Response<?, ?> response) {
+ LOG.debug("Exists request completed with {}", response);
+
+ if (response instanceof ExistsTransactionSuccess) {
+ future.set(((ExistsTransactionSuccess) response).getExists());
+ } else {
+ failFuture(future, response);
+ }
+ }
+
+ private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future, final Response<?, ?> response) {
+ LOG.debug("Read request completed with {}", response);
+
+ if (response instanceof ReadTransactionSuccess) {
+ future.set(((ReadTransactionSuccess) response).getData());
+ } else {
+ failFuture(future, response);
+ }
+ }
+
+ @Override
+ ModifyTransactionRequest doCommit(final boolean coordinated) {
+ ensureInitializedBuider();
+ builder.setCommit(coordinated);
+
+ final ModifyTransactionRequest ret = builder.build();
+ builderBusy = false;
+ return ret;
+ }
+
+ @Override
+ void doSeal() {
+ // No-op
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+
+/**
+ * An {@link AbstractClientHistory} which handles free-standing transactions.
+ *
+ * @author Robert Varga
+ */
+final class SingleClientHistory extends AbstractClientHistory {
+ protected SingleClientHistory(final DistributedDataStoreClientBehavior client,
+ final LocalHistoryIdentifier identifier) {
+ super(client, identifier);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.AbstractFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * An {@link AbstractFuture} implementation which requires a certain number of votes before it completes. If all votes
+ * are 'yes', then it completes with a pre-determined value. If any of the votes are 'no', the future completes with
+ * an exception. This exception corresponds to the cause reported by the first 'no' vote, with all subsequent votes
+ * added as suppressed exceptions.
+ *
+ * Implementation is geared toward positive votes. Negative votes have to synchronize and therefore are more likely
+ * to see contention.
+ *
+ * @author Robert Varga
+ *
+ * @param <T> Type of value returned on success
+ */
+class VotingFuture<T> extends AbstractFuture<T> {
+ @SuppressWarnings("rawtypes")
+ private static final AtomicIntegerFieldUpdater<VotingFuture> VOTES_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(VotingFuture.class, "neededVotes");
+
+ private final T result;
+
+ @GuardedBy("failures")
+ private final Collection<Throwable> failures = new ArrayList<>(0);
+ @SuppressWarnings("unused")
+ private volatile int neededVotes;
+
+ VotingFuture(final T result, final int requiredVotes) {
+ Preconditions.checkArgument(requiredVotes > 0);
+ this.neededVotes = requiredVotes;
+
+ // null is okay to allow Void type
+ this.result = result;
+ }
+
+ void voteYes() {
+ if (castVote()) {
+ synchronized (failures) {
+ resolveResult();
+ }
+ }
+ }
+
+ void voteNo(final Throwable cause) {
+ synchronized (failures) {
+ failures.add(cause);
+ if (castVote()) {
+ resolveResult();
+ }
+ }
+ }
+
+ private boolean castVote() {
+ final int votes = VOTES_UPDATER.decrementAndGet(this);
+ Verify.verify(votes >= 0);
+ return votes == 0;
+ }
+
+ @GuardedBy("failures")
+ private void resolveResult() {
+ final Iterator<Throwable> it = failures.iterator();
+ if (!it.hasNext()) {
+ set(result);
+ return;
+ }
+
+ final Throwable t = it.next();
+ while (it.hasNext()) {
+ t.addSuppressed(it.next());
+ }
+
+ setException(t);
+ }
+}
import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
@ThreadSafe
public abstract class BackendInfoResolver<T extends BackendInfo> {
private static final Logger LOG = LoggerFactory.getLogger(BackendInfoResolver.class);
- private final ConcurrentMap<Long, CompletionStage<T>> backends = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, CompletableFuture<T>> backends = new ConcurrentHashMap<>();
- // This is what the client needs to start processing. For as long as we do not have this, we should not complete
- // this stage until we have this information
- public final CompletionStage<? extends T> getBackendInfo(final long cookie) {
- return backends.computeIfAbsent(cookie, this::resolveBackendInfo);
+ /**
+ * Return the currently-resolved backend information, if available. This method is guaranteed not to block, but will
+ * initiate resolution of the information if there is none.
+ *
+ * @param cookie Backend cookie
+ * @return Backend information, if available
+ */
+ public final Optional<T> getFutureBackendInfo(final Long cookie) {
+ final Future<T> f = lookupBackend(cookie);
+ if (f.isDone()) {
+ try {
+ return Optional.of(f.get());
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.debug("Resolution of {} failed", f, e);
+ }
+ }
+
+ return Optional.empty();
}
/**
* requests information which is not currently cached.
*
* @param cookie Backend cookie
- * @return A {@link CompletionStage} resulting in information about the backend
+ * @return A {@link CompletableFuture} resulting in information about the backend
*/
- protected abstract @Nonnull CompletionStage<T> resolveBackendInfo(final @Nonnull Long cookie);
+ protected abstract @Nonnull CompletableFuture<T> resolveBackendInfo(final @Nonnull Long cookie);
/**
* Invalidate previously-resolved shard information. This method is invoked when a timeout is detected
* @param info Previous promise of backend information
*/
protected abstract void invalidateBackendInfo(@Nonnull CompletionStage<? extends BackendInfo> info);
+
+ // This is what the client needs to start processing. For as long as we do not have this, we should not complete
+ // this stage until we have this information
+ final CompletionStage<? extends T> getBackendInfo(final Long cookie) {
+ return lookupBackend(cookie);
+ }
+
+ private CompletableFuture<T> lookupBackend(final Long cookie) {
+ return backends.computeIfAbsent(Preconditions.checkNotNull(cookie), this::resolveBackendInfo);
+ }
}