BUG 3249 : Operation Limiter not release on completion of operation
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import java.util.Collection;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLong;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
23 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Future;
27 import scala.util.Try;
28
29 /**
30  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
31  * transaction factories.
32  */
33 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory>
34         implements ShardInfoListener, AutoCloseable {
35     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
36
37     protected static final AtomicLong TX_COUNTER = new AtomicLong();
38
39     private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
40     private final ActorContext actorContext;
41
42     protected AbstractTransactionContextFactory(final ActorContext actorContext) {
43         this.actorContext = Preconditions.checkNotNull(actorContext);
44     }
45
46     final ActorContext getActorContext() {
47         return actorContext;
48     }
49
50     private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
51         final LocalTransactionFactory local = knownLocal.get(shardName);
52         if (local != null) {
53             if(LOG.isDebugEnabled()) {
54                 LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
55                         parent.getIdentifier(), shardName, local);
56             }
57             return createLocalTransactionContext(local, parent);
58         }
59
60         return null;
61     }
62
63     private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
64             String shardName, TransactionContextWrapper transactionContextAdapter) {
65         if(LOG.isDebugEnabled()) {
66             LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
67                     primaryShardInfo.getPrimaryShardActor(), shardName);
68         }
69
70         updateShardInfo(shardName, primaryShardInfo);
71
72         TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
73         if(localContext != null) {
74             transactionContextAdapter.executePriorTransactionOperations(localContext);
75         } else {
76             RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextAdapter,
77                     parent, shardName);
78             remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
79         }
80     }
81
82     private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
83             String shardName, TransactionContextWrapper transactionContextAdapter) {
84         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
85
86         transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
87                 parent.getIdentifier(), parent.getLimiter()));
88     }
89
90     final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
91         final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier());
92
93         Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
94         if(findPrimaryFuture.isCompleted()) {
95             Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
96             if(maybe.isSuccess()) {
97                 onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextAdapter);
98             } else {
99                 onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextAdapter);
100             }
101         } else {
102             findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
103                 @Override
104                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
105                     if (failure == null) {
106                         onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextAdapter);
107                     } else {
108                         onFindPrimaryShardFailure(failure, parent, shardName, transactionContextAdapter);
109                     }
110                 }
111             }, actorContext.getClientDispatcher());
112         }
113
114         return transactionContextAdapter;
115     }
116
117     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
118         final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
119         if (maybeDataTree.isPresent()) {
120             knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get()));
121             LOG.debug("Shard {} resolved to local data tree", shardName);
122         }
123     }
124
125     @Override
126     public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
127         final F existing = knownLocal.get(shardName);
128         if (existing != null) {
129             if (primaryShardInfo != null) {
130                 final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
131                 if (maybeDataTree.isPresent()) {
132                     final DataTree newDataTree = maybeDataTree.get();
133                     final DataTree oldDataTree = dataTreeForFactory(existing);
134                     if (!oldDataTree.equals(newDataTree)) {
135                         final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree);
136                         knownLocal.replace(shardName, existing, newChain);
137                         LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree);
138                     }
139
140                     return;
141                 }
142             }
143             if (knownLocal.remove(shardName, existing)) {
144                 LOG.debug("Shard {} invalidated data tree {}", shardName, existing);
145             } else {
146                 LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing);
147             }
148         }
149     }
150
151     protected String getMemberName() {
152         String memberName = getActorContext().getCurrentMemberName();
153         if (memberName == null) {
154             memberName = "UNKNOWN-MEMBER";
155         }
156
157         return memberName;
158     }
159
160     /**
161      * Create an identifier for the next TransactionProxy attached to this component
162      * factory.
163      * @return Transaction identifier, may not be null.
164      */
165     protected abstract TransactionIdentifier nextIdentifier();
166
167     /**
168      * Find the primary shard actor.
169      *
170      * @param shardName Shard name
171      * @return Future containing shard information.
172      */
173     protected abstract Future<PrimaryShardInfo> findPrimaryShard(String shardName);
174
175     /**
176      * Create local transaction factory for specified shard, backed by specified shard leader
177      * and data tree instance.
178      *
179      * @param shardName
180      * @param shardLeader
181      * @param dataTree Backing data tree instance. The data tree may only be accessed in
182      *                 read-only manner.
183      * @return Transaction factory for local use.
184      */
185     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
186
187     /**
188      * Extract the backing data tree from a particular factory.
189      *
190      * @param factory Transaction factory
191      * @return Backing data tree
192      */
193     protected abstract DataTree dataTreeForFactory(F factory);
194
195     /**
196      * Callback invoked from child transactions to push any futures, which need to
197      * be waited for before the next transaction is allocated.
198      * @param cohortFutures Collection of futures
199      */
200     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
201
202     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
203         return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()), parent.getCompleter());
204     }
205 }