2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
26 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
27 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
28 import org.opendaylight.controller.cluster.access.concepts.Request;
29 import org.opendaylight.controller.cluster.access.concepts.RequestException;
30 import org.opendaylight.controller.cluster.access.concepts.Response;
31 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Per-connection representation of a local history. This class handles state replication across a single connection.
42 * @author Robert Varga
44 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
45 private abstract static class AbstractLocal extends ProxyHistory {
46 private final DataTree dataTree;
48 AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
49 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
50 super(connection, identifier);
51 this.dataTree = Preconditions.checkNotNull(dataTree);
54 final DataTreeSnapshot takeSnapshot() {
55 return dataTree.takeSnapshot();
59 private abstract static class AbstractRemote extends ProxyHistory {
60 AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
61 final LocalHistoryIdentifier identifier) {
62 super(connection, identifier);
66 final AbstractProxyTransaction doCreateTransactionProxy(
67 final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
68 return new RemoteProxyTransaction(this, txId);
72 private static final class Local extends AbstractLocal {
73 private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
74 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
76 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77 // the open one and attempts to create a new transaction again.
78 private LocalProxyTransaction lastOpen;
80 private volatile LocalProxyTransaction lastSealed;
82 Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
83 final DataTree dataTree) {
84 super(connection, identifier, dataTree);
88 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89 final TransactionIdentifier txId) {
90 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
92 // onTransactionCompleted() runs concurrently
93 final LocalProxyTransaction localSealed = lastSealed;
94 final DataTreeSnapshot baseSnapshot;
95 if (localSealed != null) {
96 baseSnapshot = localSealed.getSnapshot();
98 baseSnapshot = takeSnapshot();
101 lastOpen = new LocalProxyTransaction(this, txId,
102 (CursorAwareDataTreeModification) baseSnapshot.newModification());
103 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
108 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
109 return createClient(connection, getIdentifier());
113 void onTransactionAborted(final AbstractProxyTransaction tx) {
114 Preconditions.checkState(tx.equals(lastOpen));
119 void onTransactionCompleted(final AbstractProxyTransaction tx) {
120 Verify.verify(tx instanceof LocalProxyTransaction);
122 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
123 LOG.debug("Completed last sealed transaction {}", tx);
128 void onTransactionSealed(final AbstractProxyTransaction tx) {
129 Preconditions.checkState(tx.equals(lastOpen));
130 lastSealed = lastOpen;
135 private static final class LocalSingle extends AbstractLocal {
136 LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
137 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
138 super(connection, identifier, dataTree);
142 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
143 final TransactionIdentifier txId) {
144 return new LocalProxyTransaction(this, txId,
145 (CursorAwareDataTreeModification) takeSnapshot().newModification());
149 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
150 return createSingle(connection, getIdentifier());
154 private static final class Remote extends AbstractRemote {
155 Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
156 super(connection, identifier);
160 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
161 return createClient(connection, getIdentifier());
165 private static final class RemoteSingle extends AbstractRemote {
166 RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
167 final LocalHistoryIdentifier identifier) {
168 super(connection, identifier);
172 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
173 return createSingle(connection, getIdentifier());
177 private static final class RequestReplayException extends RequestException {
178 private static final long serialVersionUID = 1L;
180 RequestReplayException(final String format, final Object... args) {
181 super(String.format(format, args));
185 public boolean isRetriable() {
190 private final class ReconnectCohort extends ProxyReconnectCohort {
192 public LocalHistoryIdentifier getIdentifier() {
198 void replaySuccessfulRequests() {
199 for (AbstractProxyTransaction t : proxies.values()) {
200 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
201 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
202 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
203 t.startReconnect(newProxy);
209 ProxyHistory finishReconnect() {
210 final ProxyHistory ret = Verify.verifyNotNull(successor);
212 for (AbstractProxyTransaction t : proxies.values()) {
216 LOG.debug("Finished reconnecting proxy history {}", this);
222 void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
223 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
224 if (request instanceof TransactionRequest) {
225 replayTransactionRequest((TransactionRequest<?>) request, callback);
226 } else if (request instanceof LocalHistoryRequest) {
227 replayTo.accept(request, callback);
229 throw new IllegalArgumentException("Unhandled request " + request);
233 private void replayTransactionRequest(final TransactionRequest<?> request,
234 final Consumer<Response<?, ?>> callback) throws RequestException {
236 final AbstractProxyTransaction proxy;
239 proxy = proxies.get(request.getTarget());
244 throw new RequestReplayException("Failed to find proxy for %s", request);
247 proxy.replayRequest(request, callback);
251 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
253 private final Lock lock = new ReentrantLock();
254 private final LocalHistoryIdentifier identifier;
255 private final AbstractClientConnection<ShardBackendInfo> connection;
258 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
260 private ProxyHistory successor;
262 private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
263 final LocalHistoryIdentifier identifier) {
264 this.connection = Preconditions.checkNotNull(connection);
265 this.identifier = Preconditions.checkNotNull(identifier);
268 static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
269 final LocalHistoryIdentifier identifier) {
270 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
271 return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
272 : new Remote(connection, identifier);
275 static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
276 final LocalHistoryIdentifier identifier) {
277 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
278 return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
279 : new RemoteSingle(connection, identifier);
283 public LocalHistoryIdentifier getIdentifier() {
287 final ActorRef localActor() {
288 return connection.localActor();
291 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
294 if (successor != null) {
295 return successor.createTransactionProxy(txId);
298 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
299 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
300 proxies.put(proxyId, ret);
301 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
308 final void abortTransaction(final AbstractProxyTransaction tx) {
311 proxies.remove(tx.getIdentifier());
312 LOG.debug("Proxy {} aborting transaction {}", this, tx);
313 onTransactionAborted(tx);
319 final void completeTransaction(final AbstractProxyTransaction tx) {
322 proxies.remove(tx.getIdentifier());
323 LOG.debug("Proxy {} completing transaction {}", this, tx);
324 onTransactionCompleted(tx);
330 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
331 connection.sendRequest(request, callback);
335 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
336 TransactionIdentifier txId);
338 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
340 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
341 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
343 if (successor != null) {
345 throw new IllegalStateException("Proxy history " + this + " already has a successor");
348 successor = createSuccessor(newConnection);
349 LOG.debug("History {} instantiated successor {}", this, successor);
350 return new ReconnectCohort();
354 void onTransactionAborted(final AbstractProxyTransaction tx) {
355 // No-op for most implementations
359 void onTransactionCompleted(final AbstractProxyTransaction tx) {
360 // No-op for most implementations
363 void onTransactionSealed(final AbstractProxyTransaction tx) {
364 // No-op on most implementations