X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractClientHistory.java;h=542cc2dbafb48d9e8db27fe632618af54209644e;hb=127042ea7e148d9dc0282acc3780b4754ca69e12;hp=951b540f1de3f4550ba8efaac4f8a207e479caea;hpb=320a4e5cd2d9d80468a3f82798744f2035488218;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index 951b540f1d..542cc2dbaf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -9,19 +9,23 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.StampedLock; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.client.InversibleLockException; import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; @@ -33,7 +37,7 @@ import org.slf4j.LoggerFactory; * * @author Robert Varga */ -abstract class AbstractClientHistory extends LocalAbortable implements Identifiable { +public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable { enum State { IDLE, TX_OPEN, @@ -47,11 +51,14 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state"); @GuardedBy("this") - private final Map openTransactions = new HashMap<>(); + private final Map> openTransactions = new HashMap<>(); @GuardedBy("this") private final Map readyTransactions = new HashMap<>(); + @GuardedBy("lock") private final Map histories = new ConcurrentHashMap<>(); + private final StampedLock lock = new StampedLock(); + private final AbstractDataStoreClientBehavior client; private final LocalHistoryIdentifier identifier; @@ -77,8 +84,22 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia LOG.debug("Client history {} changed state from {} to {}", this, expected, next); } + final synchronized void doClose() { + final State local = state; + if (local != State.CLOSED) { + Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this); + histories.values().forEach(ProxyHistory::close); + updateState(local, State.CLOSED); + } + } + + final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) { + histories.remove(proxyHistory.getIdentifier().getCookie()); + LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory); + } + @Override - public final LocalHistoryIdentifier getIdentifier() { + public LocalHistoryIdentifier getIdentifier() { return identifier; } @@ -97,7 +118,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia LOG.debug("Force-closing history {}", getIdentifier(), cause); synchronized (this) { - for (ClientTransaction t : openTransactions.values()) { + for (AbstractClientHandle t : openTransactions.values()) { t.localAbort(cause); } openTransactions.clear(); @@ -111,42 +132,72 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * * @throws InversibleLockException if the shard is being reconnected */ + @GuardedBy("lock") private ProxyHistory createHistoryProxy(final Long shard) { final AbstractClientConnection connection = client.getConnection(shard); - final ProxyHistory ret = createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(), - identifier.getHistoryId(), shard), connection); + final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(), + identifier.getHistoryId(), shard); + LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard); + + final ProxyHistory ret = createHistoryProxy(proxyId, connection); - // Request creation of the history. - connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()), - this::createHistoryCallback); + // Request creation of the history, if it is not the single history + if (ret.getIdentifier().getHistoryId() != 0) { + connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()), + this::createHistoryCallback); + } return ret; } - abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, - final AbstractClientConnection connection); + abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId, + AbstractClientConnection connection); private void createHistoryCallback(final Response response) { LOG.debug("Create history response {}", response); } - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { + private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) { while (true) { - final ProxyHistory history; try { - history = histories.computeIfAbsent(shard, this::createHistoryProxy); + // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect, + // see comments in startReconnect() for details. + final long stamp = lock.readLock(); + try { + return histories.computeIfAbsent(shard, this::createHistoryProxy); + } finally { + lock.unlockRead(stamp); + } } catch (InversibleLockException e) { LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard); e.awaitResolution(); LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard); - continue; } + } + } + + final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) { + return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true); + } + + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { + return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false); + } - return history.createTransactionProxy(transactionId); + private void checkNotClosed() { + if (state == State.CLOSED) { + throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier)); } } - public final ClientTransaction createTransaction() { - Preconditions.checkState(state != State.CLOSED); + /** + * Allocate a new {@link ClientTransaction}. + * + * @return A new {@link ClientTransaction} + * @throws DOMTransactionChainClosedException if this history is closed + * @throws IllegalStateException if a previous dependent transaction has not been closed + */ + public ClientTransaction createTransaction() { + checkNotClosed(); synchronized (this) { final ClientTransaction ret = doCreateTransaction(); @@ -155,6 +206,26 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } } + /** + * Create a new {@link ClientSnapshot}. + * + * @return A new {@link ClientSnapshot} + * @throws DOMTransactionChainClosedException if this history is closed + * @throws IllegalStateException if a previous dependent transaction has not been closed + */ + public ClientSnapshot takeSnapshot() { + checkNotClosed(); + + synchronized (this) { + final ClientSnapshot ret = doCreateSnapshot(); + openTransactions.put(ret.getIdentifier(), ret); + return ret; + } + } + + @GuardedBy("this") + abstract ClientSnapshot doCreateSnapshot(); + @GuardedBy("this") abstract ClientTransaction doCreateTransaction(); @@ -164,10 +235,12 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * @param txId Transaction identifier * @param cohort Transaction commit cohort */ - synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId, + synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx, final AbstractTransactionCommitCohort cohort) { - final ClientTransaction tx = openTransactions.remove(txId); - Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId); + final TransactionIdentifier txId = tx.getIdentifier(); + if (openTransactions.remove(txId) == null) { + LOG.warn("Transaction {} not recorded, proceeding with readiness", txId); + } final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort); Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", @@ -181,11 +254,11 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching * backend. * - * @param txId transaction identifier + * @param snapshot transaction identifier */ - synchronized void onTransactionAbort(final TransactionIdentifier txId) { - if (openTransactions.remove(txId) == null) { - LOG.warn("Could not find aborting transaction {}", txId); + synchronized void onTransactionAbort(final AbstractClientHandle snapshot) { + if (openTransactions.remove(snapshot.getIdentifier()) == null) { + LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier()); } } @@ -202,7 +275,26 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } HistoryReconnectCohort startReconnect(final ConnectedClientConnection newConn) { - final ProxyHistory oldProxy = histories.get(newConn.cookie()); + /* + * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places. + * + * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied + * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does + * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated + * the history map. + * + * Hence we need to make sure no potential computation is happening concurrently with us looking at the history + * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation + * requests are established to happen either before or after the reconnect attempt. + */ + final ProxyHistory oldProxy; + final long stamp = lock.writeLock(); + try { + oldProxy = histories.get(newConn.cookie()); + } finally { + lock.unlockWrite(stamp); + } + if (oldProxy == null) { return null; } @@ -215,8 +307,8 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } @Override - void replaySuccessfulRequests() { - proxy.replaySuccessfulRequests(); + void replayRequests(final Collection previousEntries) { + proxy.replayRequests(previousEntries); } @Override @@ -230,4 +322,5 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } }; } + }