2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSelection;
13 import java.util.Collection;
14 import java.util.Optional;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18 import org.eclipse.jdt.annotation.NonNull;
19 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
20 import org.opendaylight.controller.cluster.access.concepts.MemberName;
21 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
23 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
24 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
25 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
26 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
27 import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31 import scala.util.Try;
34 * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
35 * transaction factories.
37 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
38 private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
39 @SuppressWarnings("rawtypes")
40 private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
41 AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
43 private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
44 private final @NonNull LocalHistoryIdentifier historyId;
45 private final @NonNull ActorUtils actorUtils;
47 // Used via TX_COUNTER_UPDATER
48 @SuppressWarnings("unused")
49 private volatile long nextTx;
51 protected AbstractTransactionContextFactory(final ActorUtils actorUtils, final LocalHistoryIdentifier historyId) {
52 this.actorUtils = requireNonNull(actorUtils);
53 this.historyId = requireNonNull(historyId);
56 final ActorUtils getActorUtils() {
60 final LocalHistoryIdentifier getHistoryId() {
64 @SuppressWarnings("checkstyle:IllegalCatch")
65 private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
66 final String shardName) {
67 final LocalTransactionFactory local = knownLocal.get(shardName);
69 LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
73 return createLocalTransactionContext(local, parent);
74 } catch (Exception e) {
75 return new NoOpTransactionContext(e, parent.getIdentifier());
82 private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
83 final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
84 final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
85 LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
86 parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
88 updateShardInfo(shardName, primaryShardInfo);
90 final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
92 if (localContext != null) {
93 LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
94 parent.getIdentifier());
95 return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
99 LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
100 parent.getIdentifier());
101 final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
102 transactionContextWrapper, parent, shardName);
103 remote.setPrimaryShard(primaryShardInfo);
104 return transactionContextWrapper;
106 onTransactionContextCreated(parent.getIdentifier());
110 private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
111 final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
112 LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
113 primaryShardInfo.getPrimaryShardActor(), shardName);
115 updateShardInfo(shardName, primaryShardInfo);
117 final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
119 if (localContext != null) {
120 transactionContextWrapper.executePriorTransactionOperations(localContext);
122 final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
123 transactionContextWrapper, parent, shardName);
124 remote.setPrimaryShard(primaryShardInfo);
127 onTransactionContextCreated(parent.getIdentifier());
131 private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
132 final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
133 LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
136 transactionContextWrapper.executePriorTransactionOperations(
137 new NoOpTransactionContext(failure, parent.getIdentifier()));
139 onTransactionContextCreated(parent.getIdentifier());
143 final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
144 final String shardName) {
145 final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
146 parent.getIdentifier(), actorUtils, shardName);
147 final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
148 if (findPrimaryFuture.isCompleted()) {
149 final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
150 if (maybe.isSuccess()) {
151 return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName, contextWrapper);
154 onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, contextWrapper);
156 findPrimaryFuture.onComplete(result -> {
157 if (result.isSuccess()) {
158 onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
160 onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
163 }, actorUtils.getClientDispatcher());
165 return contextWrapper;
168 private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
169 final Optional<ReadOnlyDataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
170 if (maybeDataTree.isPresent()) {
171 if (!knownLocal.containsKey(shardName)) {
172 LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
174 F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
175 knownLocal.putIfAbsent(shardName, factory);
177 } else if (knownLocal.containsKey(shardName)) {
178 LOG.debug("Shard {} invalidating local data tree", shardName);
180 knownLocal.remove(shardName);
184 protected final MemberName getMemberName() {
185 return historyId.getClientId().getFrontendId().getMemberName();
189 * Create an identifier for the next TransactionProxy attached to this component
191 * @return Transaction identifier, may not be null.
193 protected final TransactionIdentifier nextIdentifier() {
194 return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
198 * Find the primary shard actor.
200 * @param shardName Shard name
201 * @return Future containing shard information.
203 protected abstract Future<PrimaryShardInfo> findPrimaryShard(@NonNull String shardName,
204 @NonNull TransactionIdentifier txId);
207 * Create local transaction factory for specified shard, backed by specified shard leader
208 * and data tree instance.
210 * @param shardName the shard name
211 * @param shardLeader the shard leader
212 * @param dataTree Backing data tree instance. The data tree may only be accessed in
214 * @return Transaction factory for local use.
216 protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, ReadOnlyDataTree dataTree);
219 * Callback invoked from child transactions to push any futures, which need to
220 * be waited for before the next transaction is allocated.
221 * @param cohortFutures Collection of futures
223 protected abstract <T> void onTransactionReady(@NonNull TransactionIdentifier transaction,
224 @NonNull Collection<Future<T>> cohortFutures);
227 * Callback invoked when the internal TransactionContext has been created for a transaction.
229 * @param transactionId the ID of the transaction.
231 protected abstract void onTransactionContextCreated(@NonNull TransactionIdentifier transactionId);
233 private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
234 final TransactionProxy parent) {
236 switch (parent.getType()) {
238 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
239 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
241 DOMStoreWriteTransaction getWriteDelegate() {
242 throw new UnsupportedOperationException();
246 DOMStoreReadTransaction getReadDelegate() {
251 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
252 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
254 DOMStoreWriteTransaction getWriteDelegate() {
259 DOMStoreReadTransaction getReadDelegate() {
264 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
265 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
267 DOMStoreWriteTransaction getWriteDelegate() {
272 DOMStoreReadTransaction getReadDelegate() {
273 throw new UnsupportedOperationException();
277 throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());