2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
15 import java.util.Collection;
16 import java.util.HashMap;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
21 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
22 import java.util.concurrent.locks.StampedLock;
23 import java.util.stream.Stream;
24 import org.checkerframework.checker.lock.qual.GuardedBy;
25 import org.checkerframework.checker.lock.qual.Holding;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
29 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
30 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
31 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
33 import org.opendaylight.controller.cluster.access.concepts.Response;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
36 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
37 import org.opendaylight.yangtools.concepts.Identifiable;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
44 * and the other for single transactions.
46 * @author Robert Varga
48 public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
55 private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
56 private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
57 AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
58 private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
59 AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
62 private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
64 private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
67 private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
68 private final StampedLock lock = new StampedLock();
70 private final @NonNull AbstractDataStoreClientBehavior client;
71 private final @NonNull LocalHistoryIdentifier identifier;
73 // Used via NEXT_TX_UPDATER
74 @SuppressWarnings("unused")
75 private volatile long nextTx = 0;
77 private volatile State state = State.IDLE;
79 AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
80 this.client = requireNonNull(client);
81 this.identifier = requireNonNull(identifier);
82 checkArgument(identifier.getCookie() == 0);
89 final void updateState(final State expected, final State next) {
90 final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
91 checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
92 LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
95 final synchronized void doClose() {
96 final State local = state;
97 if (local != State.CLOSED) {
98 checkState(local == State.IDLE, "Local history %s has an open transaction", this);
99 histories.values().forEach(ProxyHistory::close);
100 updateState(local, State.CLOSED);
104 final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) {
105 histories.remove(proxyHistory.getIdentifier().getCookie());
106 LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory);
110 public final LocalHistoryIdentifier getIdentifier() {
114 final long nextTx() {
115 return NEXT_TX_UPDATER.getAndIncrement(this);
118 final Long resolveShardForPath(final YangInstanceIdentifier path) {
119 return client.resolveShardForPath(path);
122 final Stream<Long> resolveAllShards() {
123 return client.resolveAllShards();
126 final ActorUtils actorUtils() {
127 return client.actorUtils();
131 final void localAbort(final Throwable cause) {
132 final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
133 if (oldState != State.CLOSED) {
134 LOG.debug("Force-closing history {}", getIdentifier(), cause);
136 synchronized (this) {
137 for (AbstractClientHandle<?> t : openTransactions.values()) {
140 openTransactions.clear();
141 readyTransactions.clear();
147 * Create a new history proxy for a given shard.
149 * @param shard Shard cookie
150 * @throws InversibleLockException if the shard is being reconnected
153 private ProxyHistory createHistoryProxy(final Long shard) {
154 final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
155 final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
156 identifier.getHistoryId(), shard);
157 LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
159 final ProxyHistory ret = createHistoryProxy(proxyId, connection);
161 // Request creation of the history, if it is not the single history
162 if (ret.getIdentifier().getHistoryId() != 0) {
163 connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
164 this::createHistoryCallback);
169 abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId,
170 AbstractClientConnection<ShardBackendInfo> connection);
172 private void createHistoryCallback(final Response<?, ?> response) {
173 LOG.debug("Create history response {}", response);
176 private @NonNull ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
179 // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
180 // see comments in startReconnect() for details.
181 final long stamp = lock.readLock();
183 return histories.computeIfAbsent(shard, this::createHistoryProxy);
185 lock.unlockRead(stamp);
187 } catch (InversibleLockException e) {
188 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
190 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
195 final @NonNull AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId,
197 return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
200 final @NonNull AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId,
202 return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
205 private void checkNotClosed() {
206 if (state == State.CLOSED) {
207 throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier));
212 * Allocate a new {@link ClientTransaction}.
214 * @return A new {@link ClientTransaction}
215 * @throws DOMTransactionChainClosedException if this history is closed
216 * @throws IllegalStateException if a previous dependent transaction has not been closed
218 // Non-final for mocking
219 public @NonNull ClientTransaction createTransaction() {
222 synchronized (this) {
223 final ClientTransaction ret = doCreateTransaction();
224 openTransactions.put(ret.getIdentifier(), ret);
230 * Create a new {@link ClientSnapshot}.
232 * @return A new {@link ClientSnapshot}
233 * @throws DOMTransactionChainClosedException if this history is closed
234 * @throws IllegalStateException if a previous dependent transaction has not been closed
236 // Non-final for mocking
237 public ClientSnapshot takeSnapshot() {
240 synchronized (this) {
241 final ClientSnapshot ret = doCreateSnapshot();
242 openTransactions.put(ret.getIdentifier(), ret);
248 abstract ClientSnapshot doCreateSnapshot();
251 abstract ClientTransaction doCreateTransaction();
254 * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is
255 * completing with a set of participating shards.
257 * @param txId Transaction identifier
258 * @param participatingShards Participating shard cookies
260 final void onTransactionShardsBound(final TransactionIdentifier txId, final Set<Long> participatingShards) {
261 // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those
262 // will not see the holes caused by this.
263 final long stamp = lock.readLock();
265 for (var entry : histories.entrySet()) {
266 if (!participatingShards.contains(entry.getKey())) {
267 entry.getValue().skipTransaction(txId);
271 lock.unlockRead(stamp);
276 * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
278 * @param tx Client transaction
279 * @param cohort Transaction commit cohort
281 synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
282 final AbstractTransactionCommitCohort cohort) {
283 final TransactionIdentifier txId = tx.getIdentifier();
284 if (openTransactions.remove(txId) == null) {
285 LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
288 final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
289 checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", cohort, txId, previous);
291 LOG.debug("Local history {} readied transaction {}", this, txId);
296 * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
299 * @param snapshot transaction identifier
301 synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
302 if (openTransactions.remove(snapshot.getIdentifier()) == null) {
303 LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
308 * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
309 * and all its state can be removed.
311 * @param txId transaction identifier
313 // Non-final for mocking
314 synchronized void onTransactionComplete(final TransactionIdentifier txId) {
315 if (readyTransactions.remove(txId) == null) {
316 LOG.warn("Could not find completed transaction {}", txId);
320 final HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
322 * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
324 * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied
325 * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does
326 * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated
329 * Hence we need to make sure no potential computation is happening concurrently with us looking at the history
330 * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation
331 * requests are established to happen either before or after the reconnect attempt.
333 final ProxyHistory oldProxy;
334 final long stamp = lock.writeLock();
336 oldProxy = histories.get(newConn.cookie());
338 lock.unlockWrite(stamp);
341 if (oldProxy == null) {
345 final ProxyReconnectCohort proxy = verifyNotNull(oldProxy.startReconnect(newConn));
346 return new HistoryReconnectCohort() {
348 ProxyReconnectCohort getProxy() {
353 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
354 proxy.replayRequests(previousEntries);
358 public void close() {
359 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
360 final ProxyHistory newProxy = proxy.finishReconnect();
361 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
362 LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
363 AbstractClientHistory.this);