2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.HashMap;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
19 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
20 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
21 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
22 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
23 import org.opendaylight.controller.cluster.access.concepts.Response;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
26 import org.opendaylight.yangtools.concepts.Identifiable;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
32 * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
33 * and the other for single transactions.
35 * @author Robert Varga
37 abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
44 private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
45 private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
46 AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
47 private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
48 AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
51 private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
53 private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
55 private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
56 private final AbstractDataStoreClientBehavior client;
57 private final LocalHistoryIdentifier identifier;
59 // Used via NEXT_TX_UPDATER
60 @SuppressWarnings("unused")
61 private volatile long nextTx = 0;
63 private volatile State state = State.IDLE;
65 AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
66 this.client = Preconditions.checkNotNull(client);
67 this.identifier = Preconditions.checkNotNull(identifier);
68 Preconditions.checkArgument(identifier.getCookie() == 0);
75 final void updateState(final State expected, final State next) {
76 final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
77 Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
78 LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
82 public final LocalHistoryIdentifier getIdentifier() {
87 return NEXT_TX_UPDATER.getAndIncrement(this);
90 final Long resolveShardForPath(final YangInstanceIdentifier path) {
91 return client.resolveShardForPath(path);
95 final void localAbort(final Throwable cause) {
96 final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
97 if (oldState != State.CLOSED) {
98 LOG.debug("Force-closing history {}", getIdentifier(), cause);
100 synchronized (this) {
101 for (ClientTransaction t : openTransactions.values()) {
104 openTransactions.clear();
105 readyTransactions.clear();
111 * Create a new history proxy for a given shard.
113 * @throws InversibleLockException if the shard is being reconnected
115 private ProxyHistory createHistoryProxy(final Long shard) {
116 final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
117 final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
118 identifier.getHistoryId(), shard);
119 LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
121 final ProxyHistory ret = createHistoryProxy(proxyId, connection);
123 // Request creation of the history, if it is not the single history
124 if (ret.getIdentifier().getHistoryId() != 0) {
125 connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
126 this::createHistoryCallback);
131 abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
132 final AbstractClientConnection<ShardBackendInfo> connection);
134 private void createHistoryCallback(final Response<?, ?> response) {
135 LOG.debug("Create history response {}", response);
138 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
140 final ProxyHistory history;
142 history = histories.computeIfAbsent(shard, this::createHistoryProxy);
143 } catch (InversibleLockException e) {
144 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
146 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
150 return history.createTransactionProxy(transactionId);
155 * Allocate a {@link ClientTransaction}.
157 * @return A new {@link ClientTransaction}
158 * @throws TransactionChainClosedException if this history is closed
160 public final ClientTransaction createTransaction() {
161 if (state == State.CLOSED) {
162 throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
165 synchronized (this) {
166 final ClientTransaction ret = doCreateTransaction();
167 openTransactions.put(ret.getIdentifier(), ret);
173 abstract ClientTransaction doCreateTransaction();
176 * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
178 * @param txId Transaction identifier
179 * @param cohort Transaction commit cohort
181 synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
182 final AbstractTransactionCommitCohort cohort) {
183 final ClientTransaction tx = openTransactions.remove(txId);
184 Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
186 final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
187 Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
188 cohort, txId, previous);
190 LOG.debug("Local history {} readied transaction {}", this, txId);
195 * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
198 * @param txId transaction identifier
200 synchronized void onTransactionAbort(final TransactionIdentifier txId) {
201 if (openTransactions.remove(txId) == null) {
202 LOG.warn("Could not find aborting transaction {}", txId);
207 * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
208 * and all its state can be removed.
210 * @param txId transaction identifier
212 synchronized void onTransactionComplete(final TransactionIdentifier txId) {
213 if (readyTransactions.remove(txId) == null) {
214 LOG.warn("Could not find completed transaction {}", txId);
218 HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
219 final ProxyHistory oldProxy = histories.get(newConn.cookie());
220 if (oldProxy == null) {
224 final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
225 return new HistoryReconnectCohort() {
227 ProxyReconnectCohort getProxy() {
232 void replaySuccessfulRequests() {
233 proxy.replaySuccessfulRequests();
237 public void close() {
238 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
239 final ProxyHistory newProxy = proxy.finishReconnect();
240 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
241 LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
242 AbstractClientHistory.this);