2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
26 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
27 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
28 import org.opendaylight.controller.cluster.access.concepts.Request;
29 import org.opendaylight.controller.cluster.access.concepts.RequestException;
30 import org.opendaylight.controller.cluster.access.concepts.Response;
31 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Per-connection representation of a local history. This class handles state replication across a single connection.
42 * @author Robert Varga
44 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
45 private abstract static class AbstractLocal extends ProxyHistory {
46 private final DataTree dataTree;
48 AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
49 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
50 super(connection, identifier);
51 this.dataTree = Preconditions.checkNotNull(dataTree);
54 final DataTreeSnapshot takeSnapshot() {
55 return dataTree.takeSnapshot();
59 private abstract static class AbstractRemote extends ProxyHistory {
60 AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
61 final LocalHistoryIdentifier identifier) {
62 super(connection, identifier);
66 final AbstractProxyTransaction doCreateTransactionProxy(
67 final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
68 return new RemoteProxyTransaction(this, txId);
72 private static final class Local extends AbstractLocal {
73 private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
74 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
76 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77 // the open one and attempts to create a new transaction again.
78 private LocalProxyTransaction lastOpen;
80 private volatile LocalProxyTransaction lastSealed;
82 Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
83 final DataTree dataTree) {
84 super(connection, identifier, dataTree);
88 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89 final TransactionIdentifier txId) {
90 Preconditions.checkState(lastOpen == null, "Proxy {} is currently open", lastOpen);
92 // onTransactionCompleted() runs concurrently
93 final LocalProxyTransaction localSealed = lastSealed;
94 final DataTreeSnapshot baseSnapshot;
95 if (localSealed != null) {
96 baseSnapshot = localSealed.getSnapshot();
98 baseSnapshot = takeSnapshot();
101 lastOpen = new LocalProxyTransaction(this, txId,
102 (CursorAwareDataTreeModification) baseSnapshot.newModification());
107 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
108 return createClient(connection, getIdentifier());
112 void onTransactionAborted(final AbstractProxyTransaction tx) {
113 Preconditions.checkState(tx.equals(lastOpen));
118 void onTransactionCompleted(final AbstractProxyTransaction tx) {
119 Verify.verify(tx instanceof LocalProxyTransaction);
121 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
122 LOG.debug("Completed last sealed transaction {}", tx);
127 void onTransactionSealed(final AbstractProxyTransaction tx) {
128 Preconditions.checkState(tx.equals(lastOpen));
129 lastSealed = lastOpen;
134 private static final class LocalSingle extends AbstractLocal {
135 LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
136 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
137 super(connection, identifier, dataTree);
141 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
142 final TransactionIdentifier txId) {
143 return new LocalProxyTransaction(this, txId,
144 (CursorAwareDataTreeModification) takeSnapshot().newModification());
148 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
149 return createSingle(connection, getIdentifier());
153 private static final class Remote extends AbstractRemote {
154 Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
155 super(connection, identifier);
159 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
160 return createClient(connection, getIdentifier());
164 private static final class RemoteSingle extends AbstractRemote {
165 RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
166 final LocalHistoryIdentifier identifier) {
167 super(connection, identifier);
171 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
172 return createSingle(connection, getIdentifier());
176 private static final class RequestReplayException extends RequestException {
177 private static final long serialVersionUID = 1L;
179 RequestReplayException(final String format, final Object... args) {
180 super(String.format(format, args));
184 public boolean isRetriable() {
189 private final class ReconnectCohort extends ProxyReconnectCohort {
191 public LocalHistoryIdentifier getIdentifier() {
197 void replaySuccessfulRequests() {
198 for (AbstractProxyTransaction t : proxies.values()) {
199 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
200 LOG.debug("{} created successor transaction proxy {} for {}", identifier, newProxy, t);
201 t.replaySuccessfulRequests(newProxy);
207 ProxyHistory finishReconnect() {
208 final ProxyHistory ret = Verify.verifyNotNull(successor);
209 LOG.debug("Finished reconnecting proxy history {}", this);
215 void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
216 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
217 if (request instanceof TransactionRequest) {
218 replayTransactionRequest((TransactionRequest<?>) request, callback);
219 } else if (request instanceof LocalHistoryRequest) {
220 replayTo.accept(request, callback);
222 throw new IllegalArgumentException("Unhandled request " + request);
226 private void replayTransactionRequest(final TransactionRequest<?> request,
227 final Consumer<Response<?, ?>> callback) throws RequestException {
229 final AbstractProxyTransaction proxy;
232 proxy = proxies.get(request.getTarget());
237 throw new RequestReplayException("Failed to find proxy for %s", request);
240 proxy.replayRequest(request, callback);
244 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
246 private final Lock lock = new ReentrantLock();
247 private final LocalHistoryIdentifier identifier;
248 private final AbstractClientConnection<ShardBackendInfo> connection;
251 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
253 private ProxyHistory successor;
255 private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
256 final LocalHistoryIdentifier identifier) {
257 this.connection = Preconditions.checkNotNull(connection);
258 this.identifier = Preconditions.checkNotNull(identifier);
261 static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
262 final LocalHistoryIdentifier identifier) {
263 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
264 return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
265 : new Remote(connection, identifier);
268 static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
269 final LocalHistoryIdentifier identifier) {
270 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
271 return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
272 : new RemoteSingle(connection, identifier);
276 public LocalHistoryIdentifier getIdentifier() {
280 final ActorRef localActor() {
281 return connection.localActor();
284 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
285 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
289 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
290 proxies.put(proxyId, ret);
291 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
298 final void abortTransaction(final AbstractProxyTransaction tx) {
301 proxies.remove(tx.getIdentifier());
307 final void completeTransaction(final AbstractProxyTransaction tx) {
310 proxies.remove(tx.getIdentifier());
316 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
317 connection.sendRequest(request, callback);
321 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
322 TransactionIdentifier txId);
324 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
326 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
327 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
329 if (successor != null) {
331 throw new IllegalStateException("Proxy history " + this + " already has a successor");
334 successor = createSuccessor(newConnection);
335 return new ReconnectCohort();
339 void onTransactionAborted(final AbstractProxyTransaction tx) {
340 // No-op for most implementations
344 void onTransactionCompleted(final AbstractProxyTransaction tx) {
345 // No-op for most implementations
348 void onTransactionSealed(final AbstractProxyTransaction tx) {
349 // No-op on most implementations