2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.OnComplete;
14 import java.util.Collection;
15 import java.util.Optional;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
19 import org.eclipse.jdt.annotation.NonNull;
20 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.MemberName;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
25 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
26 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
27 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
28 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.Future;
32 import scala.util.Try;
35 * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
36 * transaction factories.
38 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
39 private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
40 @SuppressWarnings("rawtypes")
41 private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
42 AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
44 private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
45 private final LocalHistoryIdentifier historyId;
46 private final ActorUtils actorUtils;
48 // Used via TX_COUNTER_UPDATER
49 @SuppressWarnings("unused")
50 private volatile long nextTx;
52 protected AbstractTransactionContextFactory(final ActorUtils actorUtils, final LocalHistoryIdentifier historyId) {
53 this.actorUtils = requireNonNull(actorUtils);
54 this.historyId = requireNonNull(historyId);
57 final ActorUtils getActorUtils() {
61 final LocalHistoryIdentifier getHistoryId() {
65 @SuppressWarnings("checkstyle:IllegalCatch")
66 private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
67 final String shardName) {
68 final LocalTransactionFactory local = knownLocal.get(shardName);
70 LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
74 return createLocalTransactionContext(local, parent);
75 } catch (Exception e) {
76 return new NoOpTransactionContext(e, parent.getIdentifier());
83 private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
84 String shardName, TransactionContextWrapper transactionContextWrapper) {
85 LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
86 primaryShardInfo.getPrimaryShardActor(), shardName);
88 updateShardInfo(shardName, primaryShardInfo);
91 TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
92 if (localContext != null) {
93 transactionContextWrapper.executePriorTransactionOperations(localContext);
95 RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
97 remote.setPrimaryShard(primaryShardInfo);
100 onTransactionContextCreated(parent.getIdentifier());
104 private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
105 String shardName, TransactionContextWrapper transactionContextWrapper) {
106 LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
109 transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
110 parent.getIdentifier()));
112 onTransactionContextCreated(parent.getIdentifier());
116 final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
117 final String shardName) {
118 final TransactionContextWrapper transactionContextWrapper =
119 new TransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName);
121 Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
122 if (findPrimaryFuture.isCompleted()) {
123 Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
124 if (maybe.isSuccess()) {
125 onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
127 onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
130 findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
132 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
133 if (failure == null) {
134 onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
136 onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
139 }, actorUtils.getClientDispatcher());
142 return transactionContextWrapper;
145 private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
146 final Optional<ReadOnlyDataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
147 if (maybeDataTree.isPresent()) {
148 if (!knownLocal.containsKey(shardName)) {
149 LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
151 F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
152 knownLocal.putIfAbsent(shardName, factory);
154 } else if (knownLocal.containsKey(shardName)) {
155 LOG.debug("Shard {} invalidating local data tree", shardName);
157 knownLocal.remove(shardName);
161 protected final MemberName getMemberName() {
162 return historyId.getClientId().getFrontendId().getMemberName();
166 * Create an identifier for the next TransactionProxy attached to this component
168 * @return Transaction identifier, may not be null.
170 protected final TransactionIdentifier nextIdentifier() {
171 return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
175 * Find the primary shard actor.
177 * @param shardName Shard name
178 * @return Future containing shard information.
180 protected abstract Future<PrimaryShardInfo> findPrimaryShard(@NonNull String shardName,
181 @NonNull TransactionIdentifier txId);
184 * Create local transaction factory for specified shard, backed by specified shard leader
185 * and data tree instance.
187 * @param shardName the shard name
188 * @param shardLeader the shard leader
189 * @param dataTree Backing data tree instance. The data tree may only be accessed in
191 * @return Transaction factory for local use.
193 protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, ReadOnlyDataTree dataTree);
196 * Callback invoked from child transactions to push any futures, which need to
197 * be waited for before the next transaction is allocated.
198 * @param cohortFutures Collection of futures
200 protected abstract <T> void onTransactionReady(@NonNull TransactionIdentifier transaction,
201 @NonNull Collection<Future<T>> cohortFutures);
204 * Callback invoked when the internal TransactionContext has been created for a transaction.
206 * @param transactionId the ID of the transaction.
208 protected abstract void onTransactionContextCreated(@NonNull TransactionIdentifier transactionId);
210 private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
211 final TransactionProxy parent) {
213 switch (parent.getType()) {
215 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
216 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
218 protected DOMStoreWriteTransaction getWriteDelegate() {
219 throw new UnsupportedOperationException();
223 protected DOMStoreReadTransaction getReadDelegate() {
228 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
229 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
231 protected DOMStoreWriteTransaction getWriteDelegate() {
236 protected DOMStoreReadTransaction getReadDelegate() {
241 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
242 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
244 protected DOMStoreWriteTransaction getWriteDelegate() {
249 protected DOMStoreReadTransaction getReadDelegate() {
250 throw new UnsupportedOperationException();
254 throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());