Make private methods static
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import java.util.Collection;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLong;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
25 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29 import scala.util.Try;
30
31 /**
32  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
33  * transaction factories.
34  */
35 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
36     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
37
38     protected static final AtomicLong TX_COUNTER = new AtomicLong();
39
40     private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
41     private final ActorContext actorContext;
42
43     protected AbstractTransactionContextFactory(final ActorContext actorContext) {
44         this.actorContext = Preconditions.checkNotNull(actorContext);
45     }
46
47     final ActorContext getActorContext() {
48         return actorContext;
49     }
50
51     private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
52         final LocalTransactionFactory local = knownLocal.get(shardName);
53         if (local != null) {
54             if(LOG.isDebugEnabled()) {
55                 LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
56                         parent.getIdentifier(), shardName, local);
57             }
58             return createLocalTransactionContext(local, parent);
59         }
60
61         return null;
62     }
63
64     private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
65             String shardName, TransactionContextWrapper transactionContextWrapper) {
66         if(LOG.isDebugEnabled()) {
67             LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
68                     primaryShardInfo.getPrimaryShardActor(), shardName);
69         }
70
71         updateShardInfo(shardName, primaryShardInfo);
72
73         TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
74         if(localContext != null) {
75             transactionContextWrapper.executePriorTransactionOperations(localContext);
76         } else {
77             RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
78                     parent, shardName);
79             remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor(), primaryShardInfo.getPrimaryShardVersion());
80         }
81     }
82
83     private static void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
84             String shardName, TransactionContextWrapper transactionContextWrapper) {
85         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
86
87         transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
88                 parent.getIdentifier()));
89     }
90
91     final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
92         final TransactionContextWrapper transactionContextWrapper =
93                 new TransactionContextWrapper(parent.getIdentifier(), actorContext);
94
95         Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
96         if(findPrimaryFuture.isCompleted()) {
97             Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
98             if(maybe.isSuccess()) {
99                 onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
100             } else {
101                 onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
102             }
103         } else {
104             findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
105                 @Override
106                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
107                     if (failure == null) {
108                         onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
109                     } else {
110                         onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
111                     }
112                 }
113             }, actorContext.getClientDispatcher());
114         }
115
116         return transactionContextWrapper;
117     }
118
119     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
120         final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
121         if (maybeDataTree.isPresent()) {
122             if(!knownLocal.containsKey(shardName)) {
123                 LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
124
125                 F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
126                 knownLocal.putIfAbsent(shardName, factory);
127             }
128         } else if(knownLocal.containsKey(shardName)) {
129             LOG.debug("Shard {} invalidating local data tree", shardName);
130
131             knownLocal.remove(shardName);
132         }
133     }
134
135     protected String getMemberName() {
136         String memberName = getActorContext().getCurrentMemberName();
137         if (memberName == null) {
138             memberName = "UNKNOWN-MEMBER";
139         }
140
141         return memberName;
142     }
143
144     /**
145      * Create an identifier for the next TransactionProxy attached to this component
146      * factory.
147      * @return Transaction identifier, may not be null.
148      */
149     protected abstract TransactionIdentifier nextIdentifier();
150
151     /**
152      * Find the primary shard actor.
153      *
154      * @param shardName Shard name
155      * @return Future containing shard information.
156      */
157     protected abstract Future<PrimaryShardInfo> findPrimaryShard(String shardName);
158
159     /**
160      * Create local transaction factory for specified shard, backed by specified shard leader
161      * and data tree instance.
162      *
163      * @param shardName
164      * @param shardLeader
165      * @param dataTree Backing data tree instance. The data tree may only be accessed in
166      *                 read-only manner.
167      * @return Transaction factory for local use.
168      */
169     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
170
171     /**
172      * Callback invoked from child transactions to push any futures, which need to
173      * be waited for before the next transaction is allocated.
174      * @param cohortFutures Collection of futures
175      */
176     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
177
178     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
179                                                                     final TransactionProxy parent) {
180
181         switch(parent.getType()) {
182             case READ_ONLY:
183                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
184                 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
185                     @Override
186                     protected DOMStoreWriteTransaction getWriteDelegate() {
187                         throw new UnsupportedOperationException();
188                     }
189
190                     @Override
191                     protected DOMStoreReadTransaction getReadDelegate() {
192                         return readOnly;
193                     }
194                 };
195             case READ_WRITE:
196                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
197                 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
198                     @Override
199                     protected DOMStoreWriteTransaction getWriteDelegate() {
200                         return readWrite;
201                     }
202
203                     @Override
204                     protected DOMStoreReadTransaction getReadDelegate() {
205                         return readWrite;
206                     }
207                 };
208             case WRITE_ONLY:
209                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
210                 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
211                     @Override
212                     protected DOMStoreWriteTransaction getWriteDelegate() {
213                         return writeOnly;
214                     }
215
216                     @Override
217                     protected DOMStoreReadTransaction getReadDelegate() {
218                         throw new UnsupportedOperationException();
219                     }
220                 };
221              default:
222                  throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
223         }
224     }
225 }