2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
27 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.Request;
33 import org.opendaylight.controller.cluster.access.concepts.RequestException;
34 import org.opendaylight.controller.cluster.access.concepts.Response;
35 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Per-connection representation of a local history. This class handles state replication across a single connection.
45 * @author Robert Varga
47 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
48 private abstract static class AbstractLocal extends ProxyHistory {
49 private final DataTree dataTree;
51 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
52 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
53 super(parent, connection, identifier);
54 this.dataTree = Preconditions.checkNotNull(dataTree);
57 final DataTreeSnapshot takeSnapshot() {
58 return dataTree.takeSnapshot();
62 private abstract static class AbstractRemote extends ProxyHistory {
63 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
64 final LocalHistoryIdentifier identifier) {
65 super(parent, connection, identifier);
69 private static final class Local extends AbstractLocal {
70 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
71 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
73 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
74 // the open one and attempts to create a new transaction again.
75 private LocalReadWriteProxyTransaction lastOpen;
77 private volatile LocalReadWriteProxyTransaction lastSealed;
79 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
80 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
81 super(parent, connection, identifier, dataTree);
85 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
86 final TransactionIdentifier txId, final boolean snapshotOnly) {
87 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
89 // onTransactionCompleted() runs concurrently
90 final LocalReadWriteProxyTransaction localSealed = lastSealed;
91 final DataTreeSnapshot baseSnapshot;
92 if (localSealed != null) {
93 baseSnapshot = localSealed.getSnapshot();
95 baseSnapshot = takeSnapshot();
99 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
102 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
103 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
108 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
109 return createClient(parent(), connection, getIdentifier());
113 void onTransactionAborted(final AbstractProxyTransaction tx) {
114 if (tx.equals(lastOpen)) {
120 void onTransactionCompleted(final AbstractProxyTransaction tx) {
121 Verify.verify(tx instanceof LocalProxyTransaction);
122 if (tx instanceof LocalReadWriteProxyTransaction) {
123 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
124 LOG.debug("Completed last sealed transaction {}", tx);
130 void onTransactionSealed(final AbstractProxyTransaction tx) {
131 Preconditions.checkState(tx.equals(lastOpen));
132 lastSealed = lastOpen;
137 private static final class LocalSingle extends AbstractLocal {
138 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
139 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
140 super(parent, connection, identifier, dataTree);
144 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
145 final TransactionIdentifier txId, final boolean snapshotOnly) {
146 final DataTreeSnapshot snapshot = takeSnapshot();
147 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
148 new LocalReadWriteProxyTransaction(this, txId, snapshot);
152 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
153 return createSingle(parent(), connection, getIdentifier());
157 private static final class Remote extends AbstractRemote {
158 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
159 final LocalHistoryIdentifier identifier) {
160 super(parent, connection, identifier);
164 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
165 final TransactionIdentifier txId, final boolean snapshotOnly) {
166 return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
170 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
171 return createClient(parent(), connection, getIdentifier());
175 private static final class RemoteSingle extends AbstractRemote {
176 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
177 final LocalHistoryIdentifier identifier) {
178 super(parent, connection, identifier);
182 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
183 final TransactionIdentifier txId, final boolean snapshotOnly) {
184 return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
188 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
189 return createSingle(parent(), connection, getIdentifier());
193 private static final class RequestReplayException extends RequestException {
194 private static final long serialVersionUID = 1L;
196 RequestReplayException(final String format, final Object... args) {
197 super(String.format(format, args));
201 public boolean isRetriable() {
206 private final class ReconnectCohort extends ProxyReconnectCohort {
208 public LocalHistoryIdentifier getIdentifier() {
214 void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
215 // First look for our Create message
216 for (ConnectionEntry e : previousEntries) {
217 final Request<?, ?> req = e.getRequest();
218 if (identifier.equals(req.getTarget())) {
219 Verify.verify(req instanceof LocalHistoryRequest);
220 if (req instanceof CreateLocalHistoryRequest) {
221 successor.connection.sendRequest(req, e.getCallback());
227 for (AbstractProxyTransaction t : proxies.values()) {
228 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
229 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
231 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
232 t.replayMessages(newProxy, previousEntries);
235 // Now look for any finalizing messages
236 for (ConnectionEntry e : previousEntries) {
237 final Request<?, ?> req = e.getRequest();
238 if (identifier.equals(req.getTarget())) {
239 Verify.verify(req instanceof LocalHistoryRequest);
240 successor.connection.sendRequest(req, e.getCallback());
247 ProxyHistory finishReconnect() {
248 final ProxyHistory ret = Verify.verifyNotNull(successor);
250 for (AbstractProxyTransaction t : proxies.values()) {
254 LOG.debug("Finished reconnecting proxy history {}", this);
260 void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
261 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
262 // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
263 // period required to get into the queue.
264 if (request instanceof TransactionRequest) {
265 forwardTransactionRequest((TransactionRequest<?>) request, callback);
266 } else if (request instanceof LocalHistoryRequest) {
267 forwardTo.accept(request, callback);
269 throw new IllegalArgumentException("Unhandled request " + request);
273 private void forwardTransactionRequest(final TransactionRequest<?> request,
274 final Consumer<Response<?, ?>> callback) throws RequestException {
276 final AbstractProxyTransaction proxy;
279 proxy = proxies.get(request.getTarget());
284 throw new RequestReplayException("Failed to find proxy for %s", request);
287 proxy.forwardRequest(request, callback);
291 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
293 private final Lock lock = new ReentrantLock();
294 private final LocalHistoryIdentifier identifier;
295 private final AbstractClientConnection<ShardBackendInfo> connection;
296 private final AbstractClientHistory parent;
299 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
301 private ProxyHistory successor;
303 private ProxyHistory(final AbstractClientHistory parent,
304 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
305 this.parent = Preconditions.checkNotNull(parent);
306 this.connection = Preconditions.checkNotNull(connection);
307 this.identifier = Preconditions.checkNotNull(identifier);
310 static ProxyHistory createClient(final AbstractClientHistory parent,
311 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
312 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
313 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
314 : new Remote(parent, connection, identifier);
317 static ProxyHistory createSingle(final AbstractClientHistory parent,
318 final AbstractClientConnection<ShardBackendInfo> connection,
319 final LocalHistoryIdentifier identifier) {
320 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
321 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
322 : new RemoteSingle(parent, connection, identifier);
326 public LocalHistoryIdentifier getIdentifier() {
330 final ActorRef localActor() {
331 return connection.localActor();
334 final AbstractClientHistory parent() {
338 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
339 final boolean snapshotOnly) {
342 if (successor != null) {
343 return successor.createTransactionProxy(txId, snapshotOnly);
346 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
347 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
348 proxies.put(proxyId, ret);
349 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
356 final void abortTransaction(final AbstractProxyTransaction tx) {
359 proxies.remove(tx.getIdentifier());
360 LOG.debug("Proxy {} aborting transaction {}", this, tx);
361 onTransactionAborted(tx);
367 final void completeTransaction(final AbstractProxyTransaction tx) {
370 proxies.remove(tx.getIdentifier());
371 LOG.debug("Proxy {} completing transaction {}", this, tx);
372 onTransactionCompleted(tx);
381 if (successor != null) {
386 LOG.debug("Proxy {} invoking destroy", this);
387 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
388 this::onDestroyComplete);
394 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
395 connection.sendRequest(request, callback);
399 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
400 TransactionIdentifier txId, boolean snapshotOnly);
402 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
404 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
405 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
407 if (successor != null) {
409 throw new IllegalStateException("Proxy history " + this + " already has a successor");
412 successor = createSuccessor(newConnection);
413 LOG.debug("History {} instantiated successor {}", this, successor);
415 for (AbstractProxyTransaction t : proxies.values()) {
419 return new ReconnectCohort();
422 private void onDestroyComplete(final Response<?, ?> response) {
423 LOG.debug("Proxy {} destroy completed with {}", this, response);
427 parent.onProxyDestroyed(this);
428 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
429 this::onPurgeComplete);
435 private void onPurgeComplete(final Response<?, ?> response) {
436 LOG.debug("Proxy {} purge completed with {}", this, response);
440 void onTransactionAborted(final AbstractProxyTransaction tx) {
441 // No-op for most implementations
445 void onTransactionCompleted(final AbstractProxyTransaction tx) {
446 // No-op for most implementations
449 void onTransactionSealed(final AbstractProxyTransaction tx) {
450 // No-op on most implementations