import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.StampedLock;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
*
* @author Robert Varga
*/
-abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
+public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
enum State {
IDLE,
TX_OPEN,
@GuardedBy("this")
private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
+ @GuardedBy("lock")
private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
+ private final StampedLock lock = new StampedLock();
+
private final AbstractDataStoreClientBehavior client;
private final LocalHistoryIdentifier identifier;
LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
}
+ final synchronized void doClose() {
+ final State local = state;
+ if (local != State.CLOSED) {
+ Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this);
+ histories.values().forEach(ProxyHistory::close);
+ updateState(local, State.CLOSED);
+ }
+ }
+
+ final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) {
+ histories.remove(proxyHistory.getIdentifier().getCookie());
+ LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory);
+ }
+
@Override
- public final LocalHistoryIdentifier getIdentifier() {
+ public LocalHistoryIdentifier getIdentifier() {
return identifier;
}
*
* @throws InversibleLockException if the shard is being reconnected
*/
+ @GuardedBy("lock")
private ProxyHistory createHistoryProxy(final Long shard) {
final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
return ret;
}
- abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
- final AbstractClientConnection<ShardBackendInfo> connection);
+ abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId,
+ AbstractClientConnection<ShardBackendInfo> connection);
private void createHistoryCallback(final Response<?, ?> response) {
LOG.debug("Create history response {}", response);
private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
while (true) {
try {
- return histories.computeIfAbsent(shard, this::createHistoryProxy);
+ // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
+ // see comments in startReconnect() for details.
+ final long stamp = lock.readLock();
+ try {
+ return histories.computeIfAbsent(shard, this::createHistoryProxy);
+ } finally {
+ lock.unlockRead(stamp);
+ }
} catch (InversibleLockException e) {
LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
e.awaitResolution();
* @throws TransactionChainClosedException if this history is closed
* @throws IllegalStateException if a previous dependent transaction has not been closed
*/
- public final ClientTransaction createTransaction() {
+ public ClientTransaction createTransaction() {
checkNotClosed();
synchronized (this) {
* @throws TransactionChainClosedException if this history is closed
* @throws IllegalStateException if a previous dependent transaction has not been closed
*/
- public final ClientSnapshot takeSnapshot() {
+ public ClientSnapshot takeSnapshot() {
checkNotClosed();
synchronized (this) {
}
HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
- final ProxyHistory oldProxy = histories.get(newConn.cookie());
+ /*
+ * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
+ *
+ * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied
+ * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does
+ * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated
+ * the history map.
+ *
+ * Hence we need to make sure no potential computation is happening concurrently with us looking at the history
+ * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation
+ * requests are established to happen either before or after the reconnect attempt.
+ */
+ final ProxyHistory oldProxy;
+ final long stamp = lock.writeLock();
+ try {
+ oldProxy = histories.get(newConn.cookie());
+ } finally {
+ lock.unlockWrite(stamp);
+ }
+
if (oldProxy == null) {
return null;
}
}
};
}
+
}