2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
27 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
29 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.Request;
31 import org.opendaylight.controller.cluster.access.concepts.RequestException;
32 import org.opendaylight.controller.cluster.access.concepts.Response;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.yangtools.concepts.Identifiable;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * Per-connection representation of a local history. This class handles state replication across a single connection.
44 * @author Robert Varga
46 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
47 private abstract static class AbstractLocal extends ProxyHistory {
48 private final DataTree dataTree;
50 AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
51 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
52 super(connection, identifier);
53 this.dataTree = Preconditions.checkNotNull(dataTree);
56 final DataTreeSnapshot takeSnapshot() {
57 return dataTree.takeSnapshot();
61 private abstract static class AbstractRemote extends ProxyHistory {
62 AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
63 final LocalHistoryIdentifier identifier) {
64 super(connection, identifier);
68 final AbstractProxyTransaction doCreateTransactionProxy(
69 final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
70 return new RemoteProxyTransaction(this, txId);
74 private static final class Local extends AbstractLocal {
75 private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
76 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
78 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
79 // the open one and attempts to create a new transaction again.
80 private LocalProxyTransaction lastOpen;
82 private volatile LocalProxyTransaction lastSealed;
84 Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
85 final DataTree dataTree) {
86 super(connection, identifier, dataTree);
90 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
91 final TransactionIdentifier txId) {
92 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
94 // onTransactionCompleted() runs concurrently
95 final LocalProxyTransaction localSealed = lastSealed;
96 final DataTreeSnapshot baseSnapshot;
97 if (localSealed != null) {
98 baseSnapshot = localSealed.getSnapshot();
100 baseSnapshot = takeSnapshot();
103 lastOpen = new LocalProxyTransaction(this, txId,
104 (CursorAwareDataTreeModification) baseSnapshot.newModification());
105 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
110 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
111 return createClient(connection, getIdentifier());
115 void onTransactionAborted(final AbstractProxyTransaction tx) {
116 Preconditions.checkState(tx.equals(lastOpen));
121 void onTransactionCompleted(final AbstractProxyTransaction tx) {
122 Verify.verify(tx instanceof LocalProxyTransaction);
124 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
125 LOG.debug("Completed last sealed transaction {}", tx);
130 void onTransactionSealed(final AbstractProxyTransaction tx) {
131 Preconditions.checkState(tx.equals(lastOpen));
132 lastSealed = lastOpen;
137 private static final class LocalSingle extends AbstractLocal {
138 LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
139 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
140 super(connection, identifier, dataTree);
144 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
145 final TransactionIdentifier txId) {
146 return new LocalProxyTransaction(this, txId,
147 (CursorAwareDataTreeModification) takeSnapshot().newModification());
151 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
152 return createSingle(connection, getIdentifier());
156 private static final class Remote extends AbstractRemote {
157 Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
158 super(connection, identifier);
162 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
163 return createClient(connection, getIdentifier());
167 private static final class RemoteSingle extends AbstractRemote {
168 RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
169 final LocalHistoryIdentifier identifier) {
170 super(connection, identifier);
174 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
175 return createSingle(connection, getIdentifier());
179 private static final class RequestReplayException extends RequestException {
180 private static final long serialVersionUID = 1L;
182 RequestReplayException(final String format, final Object... args) {
183 super(String.format(format, args));
187 public boolean isRetriable() {
192 private final class ReconnectCohort extends ProxyReconnectCohort {
194 public LocalHistoryIdentifier getIdentifier() {
200 void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
201 // First look for our Create message
202 for (ConnectionEntry e : previousEntries) {
203 final Request<?, ?> req = e.getRequest();
204 if (identifier.equals(req.getTarget())) {
205 Verify.verify(req instanceof LocalHistoryRequest);
206 if (req instanceof CreateLocalHistoryRequest) {
207 successor.connection.sendRequest(req, e.getCallback());
213 for (AbstractProxyTransaction t : proxies.values()) {
214 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
215 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
216 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
217 t.replayMessages(newProxy, previousEntries);
220 // Now look for any finalizing messages
221 for (ConnectionEntry e : previousEntries) {
222 final Request<?, ?> req = e.getRequest();
223 if (identifier.equals(req.getTarget())) {
224 Verify.verify(req instanceof LocalHistoryRequest);
225 successor.connection.sendRequest(req, e.getCallback());
232 ProxyHistory finishReconnect() {
233 final ProxyHistory ret = Verify.verifyNotNull(successor);
235 for (AbstractProxyTransaction t : proxies.values()) {
239 LOG.debug("Finished reconnecting proxy history {}", this);
245 void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
246 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
247 if (request instanceof TransactionRequest) {
248 replayTransactionRequest((TransactionRequest<?>) request, callback);
249 } else if (request instanceof LocalHistoryRequest) {
250 replayTo.accept(request, callback);
252 throw new IllegalArgumentException("Unhandled request " + request);
256 private void replayTransactionRequest(final TransactionRequest<?> request,
257 final Consumer<Response<?, ?>> callback) throws RequestException {
259 final AbstractProxyTransaction proxy;
262 proxy = proxies.get(request.getTarget());
267 throw new RequestReplayException("Failed to find proxy for %s", request);
270 proxy.replayRequest(request, callback);
274 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
276 private final Lock lock = new ReentrantLock();
277 private final LocalHistoryIdentifier identifier;
278 private final AbstractClientConnection<ShardBackendInfo> connection;
281 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
283 private ProxyHistory successor;
285 private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
286 final LocalHistoryIdentifier identifier) {
287 this.connection = Preconditions.checkNotNull(connection);
288 this.identifier = Preconditions.checkNotNull(identifier);
291 static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
292 final LocalHistoryIdentifier identifier) {
293 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
294 return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
295 : new Remote(connection, identifier);
298 static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
299 final LocalHistoryIdentifier identifier) {
300 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
301 return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
302 : new RemoteSingle(connection, identifier);
306 public LocalHistoryIdentifier getIdentifier() {
310 final ActorRef localActor() {
311 return connection.localActor();
314 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
317 if (successor != null) {
318 return successor.createTransactionProxy(txId);
321 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
322 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
323 proxies.put(proxyId, ret);
324 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
331 final void abortTransaction(final AbstractProxyTransaction tx) {
334 proxies.remove(tx.getIdentifier());
335 LOG.debug("Proxy {} aborting transaction {}", this, tx);
336 onTransactionAborted(tx);
342 final void completeTransaction(final AbstractProxyTransaction tx) {
345 proxies.remove(tx.getIdentifier());
346 LOG.debug("Proxy {} completing transaction {}", this, tx);
347 onTransactionCompleted(tx);
353 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
354 connection.sendRequest(request, callback);
358 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
359 TransactionIdentifier txId);
361 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
363 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
364 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
366 if (successor != null) {
368 throw new IllegalStateException("Proxy history " + this + " already has a successor");
371 successor = createSuccessor(newConnection);
372 LOG.debug("History {} instantiated successor {}", this, successor);
374 for (AbstractProxyTransaction t : proxies.values()) {
378 return new ReconnectCohort();
382 void onTransactionAborted(final AbstractProxyTransaction tx) {
383 // No-op for most implementations
387 void onTransactionCompleted(final AbstractProxyTransaction tx) {
388 // No-op for most implementations
391 void onTransactionSealed(final AbstractProxyTransaction tx) {
392 // No-op on most implementations