2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.HashMap;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
19 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
21 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
22 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
23 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
24 import org.opendaylight.controller.cluster.access.concepts.Response;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
27 import org.opendaylight.yangtools.concepts.Identifiable;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
34 * and the other for single transactions.
36 * @author Robert Varga
38 abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
45 private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
46 private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
47 AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
48 private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
49 AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
52 private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
54 private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
56 private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
57 private final AbstractDataStoreClientBehavior client;
58 private final LocalHistoryIdentifier identifier;
60 // Used via NEXT_TX_UPDATER
61 @SuppressWarnings("unused")
62 private volatile long nextTx = 0;
64 private volatile State state = State.IDLE;
66 AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
67 this.client = Preconditions.checkNotNull(client);
68 this.identifier = Preconditions.checkNotNull(identifier);
69 Preconditions.checkArgument(identifier.getCookie() == 0);
76 final void updateState(final State expected, final State next) {
77 final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
78 Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
79 LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
83 public final LocalHistoryIdentifier getIdentifier() {
88 return NEXT_TX_UPDATER.getAndIncrement(this);
91 final Long resolveShardForPath(final YangInstanceIdentifier path) {
92 return client.resolveShardForPath(path);
96 final void localAbort(final Throwable cause) {
97 final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
98 if (oldState != State.CLOSED) {
99 LOG.debug("Force-closing history {}", getIdentifier(), cause);
101 synchronized (this) {
102 for (AbstractClientHandle<?> t : openTransactions.values()) {
105 openTransactions.clear();
106 readyTransactions.clear();
112 * Create a new history proxy for a given shard.
114 * @throws InversibleLockException if the shard is being reconnected
116 private ProxyHistory createHistoryProxy(final Long shard) {
117 final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
118 final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
119 identifier.getHistoryId(), shard);
120 LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
122 final ProxyHistory ret = createHistoryProxy(proxyId, connection);
124 // Request creation of the history, if it is not the single history
125 if (ret.getIdentifier().getHistoryId() != 0) {
126 connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
127 this::createHistoryCallback);
132 abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
133 final AbstractClientConnection<ShardBackendInfo> connection);
135 private void createHistoryCallback(final Response<?, ?> response) {
136 LOG.debug("Create history response {}", response);
139 private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
142 return histories.computeIfAbsent(shard, this::createHistoryProxy);
143 } catch (InversibleLockException e) {
144 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
146 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
151 final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) {
152 return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
155 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
156 return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
159 private void checkNotClosed() {
160 if (state == State.CLOSED) {
161 throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
166 * Allocate a new {@link ClientTransaction}.
168 * @return A new {@link ClientTransaction}
169 * @throws TransactionChainClosedException if this history is closed
170 * @throws IllegalStateException if a previous dependent transaction has not been closed
172 public final ClientTransaction createTransaction() {
175 synchronized (this) {
176 final ClientTransaction ret = doCreateTransaction();
177 openTransactions.put(ret.getIdentifier(), ret);
183 * Create a new {@link ClientSnapshot}.
185 * @return A new {@link ClientSnapshot}
186 * @throws TransactionChainClosedException if this history is closed
187 * @throws IllegalStateException if a previous dependent transaction has not been closed
189 public final ClientSnapshot takeSnapshot() {
192 synchronized (this) {
193 final ClientSnapshot ret = doCreateSnapshot();
194 openTransactions.put(ret.getIdentifier(), ret);
200 abstract ClientSnapshot doCreateSnapshot();
203 abstract ClientTransaction doCreateTransaction();
206 * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
208 * @param txId Transaction identifier
209 * @param cohort Transaction commit cohort
211 synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
212 final AbstractTransactionCommitCohort cohort) {
213 final TransactionIdentifier txId = tx.getIdentifier();
214 if (openTransactions.remove(txId) == null) {
215 LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
218 final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
219 Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
220 cohort, txId, previous);
222 LOG.debug("Local history {} readied transaction {}", this, txId);
227 * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
230 * @param snapshot transaction identifier
232 synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
233 if (openTransactions.remove(snapshot.getIdentifier()) == null) {
234 LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
239 * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
240 * and all its state can be removed.
242 * @param txId transaction identifier
244 synchronized void onTransactionComplete(final TransactionIdentifier txId) {
245 if (readyTransactions.remove(txId) == null) {
246 LOG.warn("Could not find completed transaction {}", txId);
250 HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
251 final ProxyHistory oldProxy = histories.get(newConn.cookie());
252 if (oldProxy == null) {
256 final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
257 return new HistoryReconnectCohort() {
259 ProxyReconnectCohort getProxy() {
264 void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
265 proxy.replaySuccessfulRequests(previousEntries);
269 public void close() {
270 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
271 final ProxyHistory newProxy = proxy.finishReconnect();
272 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
273 LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
274 AbstractClientHistory.this);