private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
- private final Map<Long, LocalHistoryIdentifier> histories = new ConcurrentHashMap<>();
+ private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
private final DistributedDataStoreClientBehavior client;
private final LocalHistoryIdentifier identifier;
Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
}
- private LocalHistoryIdentifier getHistoryForCookie(final Long cookie) {
- LocalHistoryIdentifier ret = histories.get(cookie);
- if (ret == null) {
- ret = new LocalHistoryIdentifier(identifier.getClientId(), identifier.getHistoryId(), cookie);
- final LocalHistoryIdentifier existing = histories.putIfAbsent(cookie, ret);
- if (existing != null) {
- ret = existing;
- }
- }
-
- return ret;
- }
-
@Override
public final LocalHistoryIdentifier getIdentifier() {
return identifier;
state = State.CLOSED;
}
+ private AbstractProxyHistory createHistoryProxy(final Long shard) {
+ final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(identifier.getClientId(),
+ identifier.getHistoryId(), shard);
+ return AbstractProxyHistory.create(client, client.resolver().getFutureBackendInfo(shard), historyId);
+ }
+
final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
- return AbstractProxyTransaction.create(client, getHistoryForCookie(shard),
- transactionId.getTransactionId(), client.resolver().getFutureBackendInfo(shard));
+ final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
+ return history.createTransactionProxy(transactionId);
}
/**
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Per-connection representation of a local history.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractProxyHistory implements Identifiable<LocalHistoryIdentifier> {
+ // FIXME: this should really be ClientConnection
+ private final DistributedDataStoreClientBehavior client;
+ private final LocalHistoryIdentifier identifier;
+
+ AbstractProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
+ this.client = Preconditions.checkNotNull(client);
+ this.identifier = Preconditions.checkNotNull(identifier);
+ }
+
+ static AbstractProxyHistory create(final DistributedDataStoreClientBehavior client,
+ final Optional<ShardBackendInfo> backendInfo, final LocalHistoryIdentifier identifier) {
+ final Optional<DataTree> dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree);
+ return dataTree.isPresent() ? new LocalProxyHistory(client, identifier, dataTree.get()) : new RemoteProxyHistory(client, identifier);
+ }
+
+ @Override
+ public LocalHistoryIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ final ActorRef localActor() {
+ return client.self();
+ }
+
+ final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
+ return doCreateTransactionProxy(client, new TransactionIdentifier(identifier, txId.getTransactionId()));
+ }
+
+ abstract AbstractProxyTransaction doCreateTransactionProxy(DistributedDataStoreClientBehavior client,
+ TransactionIdentifier txId);
+}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import akka.actor.ActorRef;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.function.Consumer;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
/**
* Class translating transaction operations towards a particular backend shard.
this.client = Preconditions.checkNotNull(client);
}
- /**
- * Instantiate a new tracker for a transaction. This method bases its decision on which implementation to use
- * based on provided {@link ShardBackendInfo}. If no information is present, it will choose the remote
- * implementation, which is fine, as the queueing logic in ClientActorBehavior will hold on to the requests until
- * the backend is located.
- *
- * @param client Client behavior
- * @param historyId Local history identifier
- * @param transactionId Transaction identifier
- * @param backend Optional backend identifier
- * @return A new state tracker
- */
- static AbstractProxyTransaction create(final DistributedDataStoreClientBehavior client,
- final LocalHistoryIdentifier historyId, final long transactionId,
- final java.util.Optional<ShardBackendInfo> backend) {
-
- final java.util.Optional<DataTree> dataTree = backend.flatMap(ShardBackendInfo::getDataTree);
- final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId);
- if (dataTree.isPresent()) {
- return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot());
- } else {
- return new RemoteProxyTransaction(client, identifier);
- }
- }
-
- final DistributedDataStoreClientBehavior client() {
- return client;
+ final ActorRef localActor() {
+ return client.self();
}
final long nextSequence() {
return doRead(path);
}
+ final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
+ client.sendRequest(request, completer);
+ }
+
/**
* Seal this transaction before it is either
*/
checkSealed();
final SettableFuture<Boolean> ret = SettableFuture.create();
- client().sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
+ sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
if (t instanceof TransactionCommitSuccess) {
ret.set(Boolean.TRUE);
} else if (t instanceof RequestFailure) {
void abort(final VotingFuture<Void> ret) {
checkSealed();
- client.sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), client().self()), t -> {
+ sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), t -> {
if (t instanceof TransactionAbortSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
void canCommit(final VotingFuture<?> ret) {
checkSealed();
- client.sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
+ sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
if (t instanceof TransactionCanCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
void preCommit(final VotingFuture<?> ret) {
checkSealed();
- client.sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
+ sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t-> {
if (t instanceof TransactionPreCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
void doCommit(final VotingFuture<?> ret) {
checkSealed();
- client.sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
+ sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t-> {
if (t instanceof TransactionCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
updateState(local, State.TX_OPEN);
- return new ClientTransaction(getClient(), this,
- new TransactionIdentifier(getIdentifier(), NEXT_TX_UPDATER.getAndIncrement(this)));
+ return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(),
+ NEXT_TX_UPDATER.getAndIncrement(this)));
}
@Override
private volatile int state = OPEN_STATE;
- ClientTransaction(final DistributedDataStoreClientBehavior client, final AbstractClientHistory parent,
- final TransactionIdentifier transactionId) {
+ ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) {
this.transactionId = Preconditions.checkNotNull(transactionId);
this.parent = Preconditions.checkNotNull(parent);
}
public ClientTransaction createTransaction() {
final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
nextTransactionId.getAndIncrement());
- final ClientTransaction tx = new ClientTransaction(this, singleHistory, txId);
+ final ClientTransaction tx = new ClientTransaction(singleHistory, txId);
LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
return returnIfOperational(transactions, txId, tx, aborted);
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+final class LocalProxyHistory extends AbstractProxyHistory {
+ private final DataTree dataTree;
+
+ LocalProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier, DataTree dataTree) {
+ super(client, identifier);
+ this.dataTree = Preconditions.checkNotNull(dataTree);
+ }
+
+ @Override
+ AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client,
+ final TransactionIdentifier txId) {
+ // FIXME: this violates history contract: we should use the last submitted transaction instead to ensure
+ // causality
+ return new LocalProxyTransaction(client, txId, dataTree.takeSnapshot());
+ }
+}
\ No newline at end of file
@Override
void doAbort() {
- client().sendRequest(new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER);
+ sendRequest(new AbortLocalTransactionRequest(identifier, localActor()), ABORT_COMPLETER);
modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
}
@Override
CommitLocalTransactionRequest doCommit(final boolean coordinated) {
- final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, client().self(),
+ final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(),
modification, coordinated);
modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted"));
return ret;
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+final class RemoteProxyHistory extends AbstractProxyHistory {
+ RemoteProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier) {
+ super(client, identifier);
+ }
+
+ @Override
+ AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client,
+ final TransactionIdentifier txId) {
+ return new RemoteProxyTransaction(client, txId);
+ }
+}
\ No newline at end of file
RemoteProxyTransaction(final DistributedDataStoreClientBehavior client,
final TransactionIdentifier identifier) {
super(client);
- builder = new ModifyTransactionRequestBuilder(identifier, client.self());
+ builder = new ModifyTransactionRequestBuilder(identifier, localActor());
}
@Override
// Make sure we send any modifications before issuing a read
ensureFlushedBuider();
- client().sendRequest(request, completer);
+ sendRequest(request, completer);
return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
}
@Override
CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
final SettableFuture<Boolean> future = SettableFuture.create();
- return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
+ return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path),
t -> completeExists(future, t), future);
}
@Override
CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
- return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
+ return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path),
t -> completeRead(future, t), future);
}
}
private void flushBuilder() {
- client().sendRequest(builder.build(), this::completeModify);
+ final ModifyTransactionRequest message = builder.build();
builderBusy = false;
+
+ sendRequest(message, this::completeModify);
}
private void appendModification(final TransactionModification modification) {