2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import java.util.Collection;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLong;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
25 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29 import scala.util.Try;
32 * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
33 * transaction factories.
35 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
36 private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
38 protected static final AtomicLong TX_COUNTER = new AtomicLong();
40 private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
41 private final ActorContext actorContext;
43 protected AbstractTransactionContextFactory(final ActorContext actorContext) {
44 this.actorContext = Preconditions.checkNotNull(actorContext);
47 final ActorContext getActorContext() {
51 private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
52 final LocalTransactionFactory local = knownLocal.get(shardName);
54 if(LOG.isDebugEnabled()) {
55 LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
56 parent.getIdentifier(), shardName, local);
58 return createLocalTransactionContext(local, parent);
64 private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
65 String shardName, TransactionContextWrapper transactionContextWrapper) {
66 if(LOG.isDebugEnabled()) {
67 LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
68 primaryShardInfo.getPrimaryShardActor(), shardName);
71 updateShardInfo(shardName, primaryShardInfo);
73 TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
74 if(localContext != null) {
75 transactionContextWrapper.executePriorTransactionOperations(localContext);
77 RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
79 remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor(), primaryShardInfo.getPrimaryShardVersion());
83 private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
84 String shardName, TransactionContextWrapper transactionContextWrapper) {
85 LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
87 transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
88 parent.getIdentifier()));
91 final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
92 final TransactionContextWrapper transactionContextWrapper =
93 new TransactionContextWrapper(parent.getIdentifier(), actorContext);
95 Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
96 if(findPrimaryFuture.isCompleted()) {
97 Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
98 if(maybe.isSuccess()) {
99 onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
101 onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
104 findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
106 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
107 if (failure == null) {
108 onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
110 onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
113 }, actorContext.getClientDispatcher());
116 return transactionContextWrapper;
119 private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
120 final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
121 if (maybeDataTree.isPresent()) {
122 if(!knownLocal.containsKey(shardName)) {
123 LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
125 F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
126 knownLocal.putIfAbsent(shardName, factory);
128 } else if(knownLocal.containsKey(shardName)) {
129 LOG.debug("Shard {} invalidating local data tree", shardName);
131 knownLocal.remove(shardName);
135 protected String getMemberName() {
136 String memberName = getActorContext().getCurrentMemberName();
137 if (memberName == null) {
138 memberName = "UNKNOWN-MEMBER";
145 * Create an identifier for the next TransactionProxy attached to this component
147 * @return Transaction identifier, may not be null.
149 protected abstract TransactionIdentifier nextIdentifier();
152 * Find the primary shard actor.
154 * @param shardName Shard name
155 * @return Future containing shard information.
157 protected abstract Future<PrimaryShardInfo> findPrimaryShard(String shardName);
160 * Create local transaction factory for specified shard, backed by specified shard leader
161 * and data tree instance.
165 * @param dataTree Backing data tree instance. The data tree may only be accessed in
167 * @return Transaction factory for local use.
169 protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
172 * Callback invoked from child transactions to push any futures, which need to
173 * be waited for before the next transaction is allocated.
174 * @param cohortFutures Collection of futures
176 protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
178 private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
179 final TransactionProxy parent) {
181 switch(parent.getType()) {
183 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
184 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
186 protected DOMStoreWriteTransaction getWriteDelegate() {
187 throw new UnsupportedOperationException();
191 protected DOMStoreReadTransaction getReadDelegate() {
196 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
197 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
199 protected DOMStoreWriteTransaction getWriteDelegate() {
204 protected DOMStoreReadTransaction getReadDelegate() {
209 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
210 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
212 protected DOMStoreWriteTransaction getWriteDelegate() {
217 protected DOMStoreReadTransaction getReadDelegate() {
218 throw new UnsupportedOperationException();
222 throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());