2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Iterator;
15 import java.util.LinkedHashMap;
17 import java.util.Optional;
18 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
19 import java.util.concurrent.locks.Lock;
20 import java.util.concurrent.locks.ReentrantLock;
21 import java.util.function.BiConsumer;
22 import java.util.function.Consumer;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
27 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
32 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
33 import org.opendaylight.controller.cluster.access.concepts.Request;
34 import org.opendaylight.controller.cluster.access.concepts.RequestException;
35 import org.opendaylight.controller.cluster.access.concepts.Response;
36 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
37 import org.opendaylight.yangtools.concepts.Identifiable;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * Per-connection representation of a local history. This class handles state replication across a single connection.
46 * @author Robert Varga
48 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
49 private abstract static class AbstractLocal extends ProxyHistory {
50 private final DataTree dataTree;
52 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
53 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
54 super(parent, connection, identifier);
55 this.dataTree = Preconditions.checkNotNull(dataTree);
58 final DataTreeSnapshot takeSnapshot() {
59 return dataTree.takeSnapshot();
63 private abstract static class AbstractRemote extends ProxyHistory {
64 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
65 final LocalHistoryIdentifier identifier) {
66 super(parent, connection, identifier);
70 private static final class Local extends AbstractLocal {
71 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
72 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
74 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
75 // the open one and attempts to create a new transaction again.
76 private LocalReadWriteProxyTransaction lastOpen;
78 private volatile LocalReadWriteProxyTransaction lastSealed;
80 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
81 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
82 super(parent, connection, identifier, dataTree);
86 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
87 final TransactionIdentifier txId, final boolean snapshotOnly) {
88 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
90 // onTransactionCompleted() runs concurrently
91 final LocalReadWriteProxyTransaction localSealed = lastSealed;
92 final DataTreeSnapshot baseSnapshot;
93 if (localSealed != null) {
94 baseSnapshot = localSealed.getSnapshot();
96 baseSnapshot = takeSnapshot();
100 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
103 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
104 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
109 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
110 return createClient(parent(), connection, getIdentifier());
114 void onTransactionAborted(final AbstractProxyTransaction tx) {
115 if (tx.equals(lastOpen)) {
121 void onTransactionCompleted(final AbstractProxyTransaction tx) {
122 Verify.verify(tx instanceof LocalProxyTransaction);
123 if (tx instanceof LocalReadWriteProxyTransaction) {
124 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
125 LOG.debug("Completed last sealed transaction {}", tx);
131 void onTransactionSealed(final AbstractProxyTransaction tx) {
132 Preconditions.checkState(tx.equals(lastOpen));
133 lastSealed = lastOpen;
138 private static final class LocalSingle extends AbstractLocal {
139 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
140 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
141 super(parent, connection, identifier, dataTree);
145 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
146 final TransactionIdentifier txId, final boolean snapshotOnly) {
147 final DataTreeSnapshot snapshot = takeSnapshot();
148 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
149 new LocalReadWriteProxyTransaction(this, txId, snapshot);
153 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
154 return createSingle(parent(), connection, getIdentifier());
158 private static final class Remote extends AbstractRemote {
159 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
160 final LocalHistoryIdentifier identifier) {
161 super(parent, connection, identifier);
165 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
166 final TransactionIdentifier txId, final boolean snapshotOnly) {
167 return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
171 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
172 return createClient(parent(), connection, getIdentifier());
176 private static final class RemoteSingle extends AbstractRemote {
177 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
178 final LocalHistoryIdentifier identifier) {
179 super(parent, connection, identifier);
183 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
184 final TransactionIdentifier txId, final boolean snapshotOnly) {
185 return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
189 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
190 return createSingle(parent(), connection, getIdentifier());
194 private static final class RequestReplayException extends RequestException {
195 private static final long serialVersionUID = 1L;
197 RequestReplayException(final String format, final Object... args) {
198 super(String.format(format, args));
202 public boolean isRetriable() {
207 private final class ReconnectCohort extends ProxyReconnectCohort {
209 public LocalHistoryIdentifier getIdentifier() {
215 void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
216 // First look for our Create message
217 Iterator<ConnectionEntry> it = previousEntries.iterator();
218 while (it.hasNext()) {
219 final ConnectionEntry e = it.next();
220 final Request<?, ?> req = e.getRequest();
221 if (identifier.equals(req.getTarget())) {
222 Verify.verify(req instanceof LocalHistoryRequest);
223 if (req instanceof CreateLocalHistoryRequest) {
224 successor.connection.sendRequest(req, e.getCallback());
231 for (AbstractProxyTransaction t : proxies.values()) {
232 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
233 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
235 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
236 t.replayMessages(newProxy, previousEntries);
239 // Now look for any finalizing messages
240 it = previousEntries.iterator();
241 while (it.hasNext()) {
242 final ConnectionEntry e = it.next();
243 final Request<?, ?> req = e.getRequest();
244 if (identifier.equals(req.getTarget())) {
245 Verify.verify(req instanceof LocalHistoryRequest);
246 if (req instanceof DestroyLocalHistoryRequest) {
247 successor.connection.sendRequest(req, e.getCallback());
257 ProxyHistory finishReconnect() {
258 final ProxyHistory ret = Verify.verifyNotNull(successor);
260 for (AbstractProxyTransaction t : proxies.values()) {
264 LOG.debug("Finished reconnecting proxy history {}", this);
270 void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
271 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
272 // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
273 // period required to get into the queue.
274 if (request instanceof TransactionRequest) {
275 forwardTransactionRequest((TransactionRequest<?>) request, callback);
276 } else if (request instanceof LocalHistoryRequest) {
277 forwardTo.accept(request, callback);
279 throw new IllegalArgumentException("Unhandled request " + request);
283 private void forwardTransactionRequest(final TransactionRequest<?> request,
284 final Consumer<Response<?, ?>> callback) throws RequestException {
286 final AbstractProxyTransaction proxy;
289 proxy = proxies.get(request.getTarget());
294 throw new RequestReplayException("Failed to find proxy for %s", request);
297 proxy.forwardRequest(request, callback);
301 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
303 private final Lock lock = new ReentrantLock();
304 private final LocalHistoryIdentifier identifier;
305 private final AbstractClientConnection<ShardBackendInfo> connection;
306 private final AbstractClientHistory parent;
309 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
311 private ProxyHistory successor;
313 private ProxyHistory(final AbstractClientHistory parent,
314 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
315 this.parent = Preconditions.checkNotNull(parent);
316 this.connection = Preconditions.checkNotNull(connection);
317 this.identifier = Preconditions.checkNotNull(identifier);
320 static ProxyHistory createClient(final AbstractClientHistory parent,
321 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
322 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
323 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
324 : new Remote(parent, connection, identifier);
327 static ProxyHistory createSingle(final AbstractClientHistory parent,
328 final AbstractClientConnection<ShardBackendInfo> connection,
329 final LocalHistoryIdentifier identifier) {
330 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
331 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
332 : new RemoteSingle(parent, connection, identifier);
336 public LocalHistoryIdentifier getIdentifier() {
340 final long currentTime() {
341 return connection.currentTime();
344 final ActorRef localActor() {
345 return connection.localActor();
348 final AbstractClientHistory parent() {
352 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
353 final boolean snapshotOnly) {
356 if (successor != null) {
357 return successor.createTransactionProxy(txId, snapshotOnly);
360 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
361 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
362 proxies.put(proxyId, ret);
363 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
370 final void abortTransaction(final AbstractProxyTransaction tx) {
373 proxies.remove(tx.getIdentifier());
374 LOG.debug("Proxy {} aborting transaction {}", this, tx);
375 onTransactionAborted(tx);
381 final void completeTransaction(final AbstractProxyTransaction tx) {
384 proxies.remove(tx.getIdentifier());
385 LOG.debug("Proxy {} completing transaction {}", this, tx);
386 onTransactionCompleted(tx);
395 if (successor != null) {
400 LOG.debug("Proxy {} invoking destroy", this);
401 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
402 this::onDestroyComplete);
408 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
409 final long enqueuedTicks) {
410 connection.enqueueRequest(request, callback, enqueuedTicks);
413 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
414 connection.sendRequest(request, callback);
418 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
419 TransactionIdentifier txId, boolean snapshotOnly);
421 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
423 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
424 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
426 if (successor != null) {
428 throw new IllegalStateException("Proxy history " + this + " already has a successor");
431 successor = createSuccessor(newConnection);
432 LOG.debug("History {} instantiated successor {}", this, successor);
434 for (AbstractProxyTransaction t : proxies.values()) {
438 return new ReconnectCohort();
441 private void onDestroyComplete(final Response<?, ?> response) {
442 LOG.debug("Proxy {} destroy completed with {}", this, response);
446 parent.onProxyDestroyed(this);
447 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
448 this::onPurgeComplete);
454 private void onPurgeComplete(final Response<?, ?> response) {
455 LOG.debug("Proxy {} purge completed with {}", this, response);
459 void onTransactionAborted(final AbstractProxyTransaction tx) {
460 // No-op for most implementations
464 void onTransactionCompleted(final AbstractProxyTransaction tx) {
465 // No-op for most implementations
468 void onTransactionSealed(final AbstractProxyTransaction tx) {
469 // No-op on most implementations