package org.opendaylight.controller.cluster.databroker.actors.dds;
import com.google.common.base.Preconditions;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
+ private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
+ @GuardedBy("this")
+ private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
+ @GuardedBy("this")
+ private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
+
private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
private final DistributedDataStoreClientBehavior client;
private final LocalHistoryIdentifier identifier;
+ // Used via NEXT_TX_UPDATER
+ @SuppressWarnings("unused")
+ private volatile long nextTx = 0;
+
private volatile State state = State.IDLE;
AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
return client;
}
+ final long nextTx() {
+ return NEXT_TX_UPDATER.getAndIncrement(this);
+ }
+
@Override
final void localAbort(final Throwable cause) {
- LOG.debug("Force-closing history {}", getIdentifier(), cause);
- state = State.CLOSED;
+ final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
+ if (oldState != State.CLOSED) {
+ LOG.debug("Force-closing history {}", getIdentifier(), cause);
+
+ synchronized (this) {
+ for (ClientTransaction t : openTransactions.values()) {
+ t.localAbort(cause);
+ }
+ openTransactions.clear();
+ readyTransactions.clear();
+ }
+ }
}
private AbstractProxyHistory createHistoryProxy(final Long shard) {
- final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(identifier.getClientId(),
- identifier.getHistoryId(), shard);
- return AbstractProxyHistory.create(client, client.resolver().getFutureBackendInfo(shard), historyId);
+ return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
+ identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard));
}
+ abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+ final Optional<ShardBackendInfo> backendInfo);
+
final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
return history.createTransactionProxy(transactionId);
}
+ public final ClientTransaction createTransaction() {
+ Preconditions.checkState(state != State.CLOSED);
+
+ synchronized (this) {
+ final ClientTransaction ret = doCreateTransaction();
+ openTransactions.put(ret.getIdentifier(), ret);
+ return ret;
+ }
+ }
+
+ @GuardedBy("this")
+ abstract ClientTransaction doCreateTransaction();
+
+ /**
+ * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
+ *
+ * @param txId Transaction identifier
+ * @param cohort Transaction commit cohort
+ */
+ synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+ final AbstractTransactionCommitCohort cohort) {
+ final ClientTransaction tx = openTransactions.remove(txId);
+ Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
+
+ final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
+ Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
+ cohort, txId, previous);
+
+ return cohort;
+ }
+
+ /**
+ * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
+ * backend.
+ *
+ * @param txId transaction identifier
+ */
+ synchronized void onTransactionAbort(final TransactionIdentifier txId) {
+ if (openTransactions.remove(txId) == null) {
+ LOG.warn("Could not find aborting transaction {}", txId);
+ }
+ }
+
/**
- * Callback invoked from {@link ClientTransaction} when a transaction has been submitted.
+ * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
+ * and all its state can be removed.
*
- * @param transaction Transaction handle
+ * @param txId transaction identifier
*/
- void onTransactionReady(final ClientTransaction transaction) {
- client.transactionComplete(transaction);
+ synchronized void onTransactionComplete(final TransactionIdentifier txId) {
+ if (readyTransactions.remove(txId) == null) {
+ LOG.warn("Could not find completed transaction {}", txId);
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+abstract class AbstractLocalProxyHistory extends AbstractProxyHistory {
+ private final DataTree dataTree;
+
+ AbstractLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
+ final DataTree dataTree) {
+ super(client, identifier);
+ this.dataTree = Preconditions.checkNotNull(dataTree);
+ }
+
+ final DataTreeSnapshot takeSnapshot() {
+ return dataTree.takeSnapshot();
+ }
+}
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
/**
- * Per-connection representation of a local history.
+ * Per-connection representation of a local history. This class handles state replication across a single connection.
*
* @author Robert Varga
*/
this.identifier = Preconditions.checkNotNull(identifier);
}
- static AbstractProxyHistory create(final DistributedDataStoreClientBehavior client,
+ static AbstractProxyHistory createClient(final DistributedDataStoreClientBehavior client,
final Optional<ShardBackendInfo> backendInfo, final LocalHistoryIdentifier identifier) {
final Optional<DataTree> dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree);
- return dataTree.isPresent() ? new LocalProxyHistory(client, identifier, dataTree.get())
- : new RemoteProxyHistory(client, identifier);
+ return dataTree.isPresent() ? new ClientLocalProxyHistory(client, identifier, dataTree.get())
+ : new RemoteProxyHistory(client, identifier);
+ }
+
+ static AbstractProxyHistory createSingle(final DistributedDataStoreClientBehavior client,
+ final Optional<ShardBackendInfo> backendInfo, final LocalHistoryIdentifier identifier) {
+ final Optional<DataTree> dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree);
+ return dataTree.isPresent() ? new SingleLocalProxyHistory(client, identifier, dataTree.get())
+ : new RemoteProxyHistory(client, identifier);
}
@Override
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
/**
static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+ private final AbstractClientHistory parent;
+ private final TransactionIdentifier txId;
+
+ AbstractTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId) {
+ this.parent = Preconditions.checkNotNull(parent);
+ this.txId = Preconditions.checkNotNull(txId);
+ }
+
+ final void complete() {
+ parent.onTransactionComplete(txId);
+ }
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).add("txId", txId).toString();
+ }
}
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.Optional;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
*/
@Beta
public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
-
- private static final AtomicLongFieldUpdater<ClientLocalHistory> NEXT_TX_UPDATER =
- AtomicLongFieldUpdater.newUpdater(ClientLocalHistory.class, "nextTx");
-
- // Used via NEXT_TX_UPDATER
- @SuppressWarnings("unused")
- private volatile long nextTx = 0;
-
ClientLocalHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
super(client, historyId);
}
- public ClientTransaction createTransaction() {
- final State local = state();
- Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
- updateState(local, State.TX_OPEN);
-
- return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(),
- NEXT_TX_UPDATER.getAndIncrement(this)));
- }
-
@Override
public void close() {
final State local = state();
}
@Override
- void onTransactionReady(final ClientTransaction transaction) {
+ ClientTransaction doCreateTransaction() {
+ final State local = state();
+ Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
+ updateState(local, State.TX_OPEN);
+
+ return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx()));
+ }
+
+ @Override
+ AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+ final AbstractTransactionCommitCohort cohort) {
+ // FIXME: deal with CLOSED here
final State local = state();
Verify.verify(local == State.TX_OPEN, "Local history %s is in unexpected state %s", this, local);
updateState(local, State.IDLE);
- super.onTransactionReady(transaction);
+
+ return super.onTransactionReady(txId, cohort);
+ }
+
+ @Override
+ AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+ final Optional<ShardBackendInfo> backendInfo) {
+ return AbstractProxyHistory.createClient(getClient(), backendInfo, historyId);
}
}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-final class LocalProxyHistory extends AbstractProxyHistory {
- private final DataTree dataTree;
-
- LocalProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier, DataTree dataTree) {
- super(client, identifier);
- this.dataTree = Preconditions.checkNotNull(dataTree);
+final class ClientLocalProxyHistory extends AbstractLocalProxyHistory {
+ ClientLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
+ final DataTree dataTree) {
+ super(client, identifier, dataTree);
}
@Override
final TransactionIdentifier txId) {
// FIXME: this violates history contract: we should use the last submitted transaction instead to ensure
// causality
- return new LocalProxyTransaction(client, txId, dataTree.takeSnapshot());
+ return new LocalProxyTransaction(client, txId, takeSnapshot());
}
}
\ No newline at end of file
for (AbstractProxyTransaction p : proxies.values()) {
p.seal();
}
- parent.onTransactionReady(this);
+ final AbstractTransactionCommitCohort cohort;
switch (proxies.size()) {
case 0:
- return EmptyTransactionCommitCohort.INSTANCE;
+ cohort = new EmptyTransactionCommitCohort(parent, transactionId);
+ break;
case 1:
- return new DirectTransactionCommitCohort(Iterables.getOnlyElement(proxies.values()));
+ cohort = new DirectTransactionCommitCohort(parent, transactionId,
+ Iterables.getOnlyElement(proxies.values()));
+ break;
default:
- return new ClientTransactionCommitCohort(proxies.values());
+ cohort = new ClientTransactionCommitCohort(parent, transactionId, proxies.values());
+ break;
}
+
+ return parent.onTransactionReady(transactionId, cohort);
}
/**
proxy.abort();
}
proxies.clear();
+
+ parent.onTransactionAbort(transactionId);
}
}
LOG.debug("Aborting transaction {}", getIdentifier(), cause);
abort();
}
+
+ Map<Long, AbstractProxyTransaction> getProxies() {
+ return proxies;
+ }
}
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
-import java.util.List;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohort {
- private final List<AbstractProxyTransaction> proxies;
+ private final Collection<AbstractProxyTransaction> proxies;
- ClientTransactionCommitCohort(final Collection<AbstractProxyTransaction> proxies) {
+ ClientTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId,
+ final Collection<AbstractProxyTransaction> proxies) {
+ super(parent, txId);
this.proxies = ImmutableList.copyOf(proxies);
}
return ret;
}
+ private ListenableFuture<Void> addComplete(final ListenableFuture<Void> future) {
+ future.addListener(this::complete, MoreExecutors.directExecutor());
+ return future;
+ }
+
@Override
public ListenableFuture<Void> preCommit() {
final VotingFuture<Void> ret = new VotingFuture<>(null, proxies.size());
proxy.doCommit(ret);
}
- return ret;
+ return addComplete(ret);
}
@Override
proxy.abort(ret);
}
- return ret;
+ return addComplete(ret);
}
}
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
/**
* An {@link AbstractTransactionCommitCohort} implementation for transactions which contain a single proxy. Since there
final class DirectTransactionCommitCohort extends AbstractTransactionCommitCohort {
private final AbstractProxyTransaction proxy;
- DirectTransactionCommitCohort(final AbstractProxyTransaction proxy) {
+ DirectTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId,
+ final AbstractProxyTransaction proxy) {
+ super(parent, txId);
this.proxy = Preconditions.checkNotNull(proxy);
}
@Override
public ListenableFuture<Void> abort() {
+ complete();
return VOID_FUTURE;
}
@Override
public ListenableFuture<Void> commit() {
+ complete();
return VOID_FUTURE;
}
}
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
- private final Map<TransactionIdentifier, ClientTransaction> transactions = new ConcurrentHashMap<>();
private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
private final AtomicLong nextHistoryId = new AtomicLong(1);
- private final AtomicLong nextTransactionId = new AtomicLong();
private final ModuleShardBackendResolver resolver;
private final SingleClientHistory singleHistory;
h.localAbort(cause);
}
histories.clear();
-
- for (ClientTransaction t : transactions.values()) {
- t.localAbort(cause);
- }
- transactions.clear();
}
private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
@Override
public ClientTransaction createTransaction() {
- final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
- nextTransactionId.getAndIncrement());
- final ClientTransaction tx = new ClientTransaction(singleHistory, txId);
- LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
-
- return returnIfOperational(transactions, txId, tx, aborted);
+ return singleHistory.createTransaction();
}
@Override
return resolver;
}
- void transactionComplete(final ClientTransaction transaction) {
- transactions.remove(transaction.getIdentifier());
- }
-
void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
sendRequest(request, response -> {
completer.accept(response);
package org.opendaylight.controller.cluster.databroker.actors.dds;
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
/**
* An {@link AbstractTransactionCommitCohort} for use with empty transactions. This relies on the fact that no backends
* @author Robert Varga
*/
final class EmptyTransactionCommitCohort extends AbstractTransactionCommitCohort {
- static final DOMStoreThreePhaseCommitCohort INSTANCE = new EmptyTransactionCommitCohort();
-
- private EmptyTransactionCommitCohort() {
- // Hidden
+ EmptyTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId) {
+ super(parent, txId);
}
@Override
@Override
public ListenableFuture<Void> abort() {
+ complete();
return VOID_FUTURE;
}
@Override
public ListenableFuture<Void> commit() {
+ complete();
return VOID_FUTURE;
}
}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import java.util.Optional;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An {@link AbstractClientHistory} which handles free-standing transactions.
* @author Robert Varga
*/
final class SingleClientHistory extends AbstractClientHistory {
- protected SingleClientHistory(final DistributedDataStoreClientBehavior client,
- final LocalHistoryIdentifier identifier) {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
+
+ SingleClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
super(client, identifier);
}
+
+ @Override
+ ClientTransaction doCreateTransaction() {
+ final TransactionIdentifier txId = new TransactionIdentifier(getIdentifier(), nextTx());
+ LOG.debug("{}: creating a new transaction {}", this, txId);
+
+ return new ClientTransaction(this, txId);
+ }
+
+ @Override
+ AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+ final Optional<ShardBackendInfo> backendInfo) {
+ return AbstractProxyHistory.createSingle(getClient(), backendInfo, historyId);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+final class SingleLocalProxyHistory extends AbstractLocalProxyHistory {
+ SingleLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
+ final DataTree dataTree) {
+ super(client, identifier, dataTree);
+ }
+
+ @Override
+ AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client,
+ final TransactionIdentifier txId) {
+ return new LocalProxyTransaction(client, txId, takeSnapshot());
+ }
+}
\ No newline at end of file