BUG-5280: move transactions keeping to history
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import java.util.HashMap;
12 import java.util.Map;
13 import java.util.Optional;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.yangtools.concepts.Identifiable;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 /**
25  * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
26  * and the other for single transactions.
27  *
28  * @author Robert Varga
29  */
30 abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
31     enum State {
32         IDLE,
33         TX_OPEN,
34         CLOSED,
35     }
36
37     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
38     private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
39             AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
40     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
41             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
42
43     @GuardedBy("this")
44     private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
45     @GuardedBy("this")
46     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
47
48     private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
49     private final DistributedDataStoreClientBehavior client;
50     private final LocalHistoryIdentifier identifier;
51
52     // Used via NEXT_TX_UPDATER
53     @SuppressWarnings("unused")
54     private volatile long nextTx = 0;
55
56     private volatile State state = State.IDLE;
57
58     AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
59         this.client = Preconditions.checkNotNull(client);
60         this.identifier = Preconditions.checkNotNull(identifier);
61         Preconditions.checkArgument(identifier.getCookie() == 0);
62     }
63
64     final State state() {
65         return state;
66     }
67
68     final void updateState(final State expected, final State next) {
69         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
70         Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
71     }
72
73     @Override
74     public final LocalHistoryIdentifier getIdentifier() {
75         return identifier;
76     }
77
78     final DistributedDataStoreClientBehavior getClient() {
79         return client;
80     }
81
82     final long nextTx() {
83         return NEXT_TX_UPDATER.getAndIncrement(this);
84     }
85
86     @Override
87     final void localAbort(final Throwable cause) {
88         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
89         if (oldState != State.CLOSED) {
90             LOG.debug("Force-closing history {}", getIdentifier(), cause);
91
92             synchronized (this) {
93                 for (ClientTransaction t : openTransactions.values()) {
94                     t.localAbort(cause);
95                 }
96                 openTransactions.clear();
97                 readyTransactions.clear();
98             }
99         }
100     }
101
102     private AbstractProxyHistory createHistoryProxy(final Long shard) {
103         return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
104             identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard));
105     }
106
107     abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
108             final Optional<ShardBackendInfo> backendInfo);
109
110     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
111         final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
112         return history.createTransactionProxy(transactionId);
113     }
114
115     public final ClientTransaction createTransaction() {
116         Preconditions.checkState(state != State.CLOSED);
117
118         synchronized (this) {
119             final ClientTransaction ret = doCreateTransaction();
120             openTransactions.put(ret.getIdentifier(), ret);
121             return ret;
122         }
123     }
124
125     @GuardedBy("this")
126     abstract ClientTransaction doCreateTransaction();
127
128     /**
129      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
130      *
131      * @param txId Transaction identifier
132      * @param cohort Transaction commit cohort
133      */
134     synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
135             final AbstractTransactionCommitCohort cohort) {
136         final ClientTransaction tx = openTransactions.remove(txId);
137         Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
138
139         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
140         Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
141                 cohort, txId, previous);
142
143         return cohort;
144     }
145
146     /**
147      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
148      * backend.
149      *
150      * @param txId transaction identifier
151      */
152     synchronized void onTransactionAbort(final TransactionIdentifier txId) {
153         if (openTransactions.remove(txId) == null) {
154             LOG.warn("Could not find aborting transaction {}", txId);
155         }
156     }
157
158     /**
159      * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
160      * and all its state can be removed.
161      *
162      * @param txId transaction identifier
163      */
164     synchronized void onTransactionComplete(final TransactionIdentifier txId) {
165         if (readyTransactions.remove(txId) == null) {
166             LOG.warn("Could not find completed transaction {}", txId);
167         }
168     }
169 }