2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.HashMap;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
19 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
20 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
21 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
22 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
23 import org.opendaylight.controller.cluster.access.concepts.Response;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.yangtools.concepts.Identifiable;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
32 * and the other for single transactions.
34 * @author Robert Varga
36 abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
43 private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
44 private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
45 AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
46 private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
47 AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
50 private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
52 private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
54 private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
55 private final AbstractDataStoreClientBehavior client;
56 private final LocalHistoryIdentifier identifier;
58 // Used via NEXT_TX_UPDATER
59 @SuppressWarnings("unused")
60 private volatile long nextTx = 0;
62 private volatile State state = State.IDLE;
64 AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
65 this.client = Preconditions.checkNotNull(client);
66 this.identifier = Preconditions.checkNotNull(identifier);
67 Preconditions.checkArgument(identifier.getCookie() == 0);
74 final void updateState(final State expected, final State next) {
75 final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
76 Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
77 LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
81 public final LocalHistoryIdentifier getIdentifier() {
86 return NEXT_TX_UPDATER.getAndIncrement(this);
89 final Long resolveShardForPath(final YangInstanceIdentifier path) {
90 return client.resolveShardForPath(path);
94 final void localAbort(final Throwable cause) {
95 final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
96 if (oldState != State.CLOSED) {
97 LOG.debug("Force-closing history {}", getIdentifier(), cause);
100 for (ClientTransaction t : openTransactions.values()) {
103 openTransactions.clear();
104 readyTransactions.clear();
110 * Create a new history proxy for a given shard.
112 * @throws InversibleLockException if the shard is being reconnected
114 private ProxyHistory createHistoryProxy(final Long shard) {
115 final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
116 final ProxyHistory ret = createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
117 identifier.getHistoryId(), shard), connection);
119 // Request creation of the history.
120 connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
121 this::createHistoryCallback);
125 abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
126 final AbstractClientConnection<ShardBackendInfo> connection);
128 private void createHistoryCallback(final Response<?, ?> response) {
129 LOG.debug("Create history response {}", response);
132 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
134 final ProxyHistory history;
136 history = histories.computeIfAbsent(shard, this::createHistoryProxy);
137 } catch (InversibleLockException e) {
138 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
140 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
144 return history.createTransactionProxy(transactionId);
148 public final ClientTransaction createTransaction() {
149 Preconditions.checkState(state != State.CLOSED);
151 synchronized (this) {
152 final ClientTransaction ret = doCreateTransaction();
153 openTransactions.put(ret.getIdentifier(), ret);
159 abstract ClientTransaction doCreateTransaction();
162 * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
164 * @param txId Transaction identifier
165 * @param cohort Transaction commit cohort
167 synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
168 final AbstractTransactionCommitCohort cohort) {
169 final ClientTransaction tx = openTransactions.remove(txId);
170 Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
172 final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
173 Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
174 cohort, txId, previous);
176 LOG.debug("Local history {} readied transaction {}", this, txId);
181 * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
184 * @param txId transaction identifier
186 synchronized void onTransactionAbort(final TransactionIdentifier txId) {
187 if (openTransactions.remove(txId) == null) {
188 LOG.warn("Could not find aborting transaction {}", txId);
193 * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
194 * and all its state can be removed.
196 * @param txId transaction identifier
198 synchronized void onTransactionComplete(final TransactionIdentifier txId) {
199 if (readyTransactions.remove(txId) == null) {
200 LOG.warn("Could not find completed transaction {}", txId);
204 HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
205 final ProxyHistory oldProxy = histories.get(newConn.cookie());
206 if (oldProxy == null) {
210 final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
211 return new HistoryReconnectCohort() {
213 ProxyReconnectCohort getProxy() {
218 void replaySuccessfulRequests() {
219 proxy.replaySuccessfulRequests();
223 public void close() {
224 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
225 final ProxyHistory newProxy = proxy.finishReconnect();
226 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
227 LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
228 AbstractClientHistory.this);