2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import com.google.common.base.Preconditions;
11 import java.util.HashMap;
13 import java.util.Optional;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.yangtools.concepts.Identifiable;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
25 * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
26 * and the other for single transactions.
28 * @author Robert Varga
30 abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
37 private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
38 private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
39 AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
40 private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
41 AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
44 private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
46 private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
48 private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
49 private final DistributedDataStoreClientBehavior client;
50 private final LocalHistoryIdentifier identifier;
52 // Used via NEXT_TX_UPDATER
53 @SuppressWarnings("unused")
54 private volatile long nextTx = 0;
56 private volatile State state = State.IDLE;
58 AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
59 this.client = Preconditions.checkNotNull(client);
60 this.identifier = Preconditions.checkNotNull(identifier);
61 Preconditions.checkArgument(identifier.getCookie() == 0);
68 final void updateState(final State expected, final State next) {
69 final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
70 Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
74 public final LocalHistoryIdentifier getIdentifier() {
78 final DistributedDataStoreClientBehavior getClient() {
83 return NEXT_TX_UPDATER.getAndIncrement(this);
87 final void localAbort(final Throwable cause) {
88 final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
89 if (oldState != State.CLOSED) {
90 LOG.debug("Force-closing history {}", getIdentifier(), cause);
93 for (ClientTransaction t : openTransactions.values()) {
96 openTransactions.clear();
97 readyTransactions.clear();
102 private AbstractProxyHistory createHistoryProxy(final Long shard) {
103 return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
104 identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard));
107 abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
108 final Optional<ShardBackendInfo> backendInfo);
110 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
111 final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
112 return history.createTransactionProxy(transactionId);
115 public final ClientTransaction createTransaction() {
116 Preconditions.checkState(state != State.CLOSED);
118 synchronized (this) {
119 final ClientTransaction ret = doCreateTransaction();
120 openTransactions.put(ret.getIdentifier(), ret);
126 abstract ClientTransaction doCreateTransaction();
129 * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
131 * @param txId Transaction identifier
132 * @param cohort Transaction commit cohort
134 synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
135 final AbstractTransactionCommitCohort cohort) {
136 final ClientTransaction tx = openTransactions.remove(txId);
137 Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
139 final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
140 Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
141 cohort, txId, previous);
147 * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
150 * @param txId transaction identifier
152 synchronized void onTransactionAbort(final TransactionIdentifier txId) {
153 if (openTransactions.remove(txId) == null) {
154 LOG.warn("Could not find aborting transaction {}", txId);
159 * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
160 * and all its state can be removed.
162 * @param txId transaction identifier
164 synchronized void onTransactionComplete(final TransactionIdentifier txId) {
165 if (readyTransactions.remove(txId) == null) {
166 LOG.warn("Could not find completed transaction {}", txId);