X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractClientHistory.java;h=542cc2dbafb48d9e8db27fe632618af54209644e;hb=466078ab1dc8a8cc2981b161051f6edecd6af85a;hp=1be84643350acdd197f5ab5dfe10dfcf558b146e;hpb=5cb0787412ab63a3aa5dcc044511e1ce569662cf;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index 1be8464335..542cc2dbaf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -9,11 +9,13 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.StampedLock; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; @@ -23,7 +25,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.mdsal.common.api.TransactionChainClosedException; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; @@ -35,7 +37,7 @@ import org.slf4j.LoggerFactory; * * @author Robert Varga */ -abstract class AbstractClientHistory extends LocalAbortable implements Identifiable { +public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable { enum State { IDLE, TX_OPEN, @@ -53,7 +55,10 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia @GuardedBy("this") private final Map readyTransactions = new HashMap<>(); + @GuardedBy("lock") private final Map histories = new ConcurrentHashMap<>(); + private final StampedLock lock = new StampedLock(); + private final AbstractDataStoreClientBehavior client; private final LocalHistoryIdentifier identifier; @@ -79,8 +84,22 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia LOG.debug("Client history {} changed state from {} to {}", this, expected, next); } + final synchronized void doClose() { + final State local = state; + if (local != State.CLOSED) { + Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this); + histories.values().forEach(ProxyHistory::close); + updateState(local, State.CLOSED); + } + } + + final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) { + histories.remove(proxyHistory.getIdentifier().getCookie()); + LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory); + } + @Override - public final LocalHistoryIdentifier getIdentifier() { + public LocalHistoryIdentifier getIdentifier() { return identifier; } @@ -113,6 +132,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * * @throws InversibleLockException if the shard is being reconnected */ + @GuardedBy("lock") private ProxyHistory createHistoryProxy(final Long shard) { final AbstractClientConnection connection = client.getConnection(shard); final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(), @@ -129,8 +149,8 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia return ret; } - abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, - final AbstractClientConnection connection); + abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId, + AbstractClientConnection connection); private void createHistoryCallback(final Response response) { LOG.debug("Create history response {}", response); @@ -139,7 +159,14 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) { while (true) { try { - return histories.computeIfAbsent(shard, this::createHistoryProxy); + // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect, + // see comments in startReconnect() for details. + final long stamp = lock.readLock(); + try { + return histories.computeIfAbsent(shard, this::createHistoryProxy); + } finally { + lock.unlockRead(stamp); + } } catch (InversibleLockException e) { LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard); e.awaitResolution(); @@ -158,7 +185,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia private void checkNotClosed() { if (state == State.CLOSED) { - throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier)); + throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier)); } } @@ -166,10 +193,10 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * Allocate a new {@link ClientTransaction}. * * @return A new {@link ClientTransaction} - * @throws TransactionChainClosedException if this history is closed + * @throws DOMTransactionChainClosedException if this history is closed * @throws IllegalStateException if a previous dependent transaction has not been closed */ - public final ClientTransaction createTransaction() { + public ClientTransaction createTransaction() { checkNotClosed(); synchronized (this) { @@ -183,10 +210,10 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * Create a new {@link ClientSnapshot}. * * @return A new {@link ClientSnapshot} - * @throws TransactionChainClosedException if this history is closed + * @throws DOMTransactionChainClosedException if this history is closed * @throws IllegalStateException if a previous dependent transaction has not been closed */ - public final ClientSnapshot takeSnapshot() { + public ClientSnapshot takeSnapshot() { checkNotClosed(); synchronized (this) { @@ -248,7 +275,26 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } HistoryReconnectCohort startReconnect(final ConnectedClientConnection newConn) { - final ProxyHistory oldProxy = histories.get(newConn.cookie()); + /* + * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places. + * + * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied + * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does + * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated + * the history map. + * + * Hence we need to make sure no potential computation is happening concurrently with us looking at the history + * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation + * requests are established to happen either before or after the reconnect attempt. + */ + final ProxyHistory oldProxy; + final long stamp = lock.writeLock(); + try { + oldProxy = histories.get(newConn.cookie()); + } finally { + lock.unlockWrite(stamp); + } + if (oldProxy == null) { return null; } @@ -261,8 +307,8 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } @Override - void replaySuccessfulRequests(final Iterable previousEntries) { - proxy.replaySuccessfulRequests(previousEntries); + void replayRequests(final Collection previousEntries) { + proxy.replayRequests(previousEntries); } @Override @@ -276,4 +322,5 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } }; } + }