2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import java.util.Collection;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLong;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
23 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Future;
27 import scala.util.Try;
30 * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
31 * transaction factories.
33 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory>
34 implements ShardInfoListener, AutoCloseable {
35 private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
37 protected static final AtomicLong TX_COUNTER = new AtomicLong();
39 private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
40 private final ActorContext actorContext;
42 protected AbstractTransactionContextFactory(final ActorContext actorContext) {
43 this.actorContext = Preconditions.checkNotNull(actorContext);
46 final ActorContext getActorContext() {
50 private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
51 final LocalTransactionFactory local = knownLocal.get(shardName);
53 if(LOG.isDebugEnabled()) {
54 LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
55 parent.getIdentifier(), shardName, local);
57 return createLocalTransactionContext(local, parent);
63 private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
64 String shardName, TransactionContextWrapper transactionContextAdapter) {
65 if(LOG.isDebugEnabled()) {
66 LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
67 primaryShardInfo.getPrimaryShardActor(), shardName);
70 updateShardInfo(shardName, primaryShardInfo);
72 TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
73 if(localContext != null) {
74 transactionContextAdapter.executePriorTransactionOperations(localContext);
76 RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextAdapter,
78 remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
82 private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
83 String shardName, TransactionContextWrapper transactionContextAdapter) {
84 LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
86 transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
87 parent.getIdentifier(), parent.getLimiter()));
90 final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
91 final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier());
93 Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
94 if(findPrimaryFuture.isCompleted()) {
95 Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
96 if(maybe.isSuccess()) {
97 onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextAdapter);
99 onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextAdapter);
102 findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
104 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
105 if (failure == null) {
106 onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextAdapter);
108 onFindPrimaryShardFailure(failure, parent, shardName, transactionContextAdapter);
111 }, actorContext.getClientDispatcher());
114 return transactionContextAdapter;
117 private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
118 final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
119 if (maybeDataTree.isPresent()) {
120 knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get()));
121 LOG.debug("Shard {} resolved to local data tree", shardName);
126 public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
127 final F existing = knownLocal.get(shardName);
128 if (existing != null) {
129 if (primaryShardInfo != null) {
130 final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
131 if (maybeDataTree.isPresent()) {
132 final DataTree newDataTree = maybeDataTree.get();
133 final DataTree oldDataTree = dataTreeForFactory(existing);
134 if (!oldDataTree.equals(newDataTree)) {
135 final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree);
136 knownLocal.replace(shardName, existing, newChain);
137 LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree);
143 if (knownLocal.remove(shardName, existing)) {
144 LOG.debug("Shard {} invalidated data tree {}", shardName, existing);
146 LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing);
151 protected String getMemberName() {
152 String memberName = getActorContext().getCurrentMemberName();
153 if (memberName == null) {
154 memberName = "UNKNOWN-MEMBER";
161 * Create an identifier for the next TransactionProxy attached to this component
163 * @return Transaction identifier, may not be null.
165 protected abstract TransactionIdentifier nextIdentifier();
168 * Find the primary shard actor.
170 * @param shardName Shard name
171 * @return Future containing shard information.
173 protected abstract Future<PrimaryShardInfo> findPrimaryShard(String shardName);
176 * Create local transaction factory for specified shard, backed by specified shard leader
177 * and data tree instance.
181 * @param dataTree Backing data tree instance. The data tree may only be accessed in
183 * @return Transaction factory for local use.
185 protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
188 * Extract the backing data tree from a particular factory.
190 * @param factory Transaction factory
191 * @return Backing data tree
193 protected abstract DataTree dataTreeForFactory(F factory);
196 * Callback invoked from child transactions to push any futures, which need to
197 * be waited for before the next transaction is allocated.
198 * @param cohortFutures Collection of futures
200 protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
202 private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
203 return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()), parent.getCompleter());