Merge "Fix test coverage not being reported in Sonar"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.serialization.Serialization;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
29 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
31 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
32 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
33 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
34 import org.opendaylight.controller.cluster.datastore.modification.Modification;
35 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
36 import org.opendaylight.controller.cluster.raft.RaftActor;
37 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
38 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45
46 import java.util.ArrayList;
47 import java.util.HashMap;
48 import java.util.List;
49 import java.util.Map;
50 import java.util.concurrent.ExecutionException;
51 import java.util.concurrent.Executors;
52
53 /**
54  * A Shard represents a portion of the logical data tree <br/>
55  * <p>
56  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
57  * </p>
58  */
59 public class Shard extends RaftActor {
60
61     public static final String DEFAULT_NAME = "default";
62
63     private final ListeningExecutorService storeExecutor =
64         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
65
66     private final InMemoryDOMDataStore store;
67
68     private final Map<Object, DOMStoreThreePhaseCommitCohort>
69         modificationToCohort = new HashMap<>();
70
71     private final LoggingAdapter LOG =
72         Logging.getLogger(getContext().system(), this);
73
74     // By default persistent will be true and can be turned off using the system
75     // property persistent
76     private final boolean persistent;
77
78     private final String name;
79
80     private volatile SchemaContext schemaContext;
81
82     private final ShardStats shardMBean;
83
84     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
85
86     private Shard(String name, Map<String, String> peerAddresses) {
87         super(name, peerAddresses);
88
89         this.name = name;
90
91         String setting = System.getProperty("shard.persistent");
92
93         this.persistent = !"false".equals(setting);
94
95         LOG.info("Creating shard : {} persistent : {}", name, persistent);
96
97         store = new InMemoryDOMDataStore(name, storeExecutor);
98
99         shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
100
101     }
102
103     public static Props props(final String name, final Map<String, String> peerAddresses) {
104         return Props.create(new Creator<Shard>() {
105
106             @Override
107             public Shard create() throws Exception {
108                 return new Shard(name, peerAddresses);
109             }
110
111         });
112     }
113
114
115     @Override public void onReceiveCommand(Object message){
116         LOG.debug("Received message {} from {}", message.getClass().toString(), getSender());
117
118         if (message.getClass().equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
119             if(isLeader()) {
120                 createTransactionChain();
121             } else if(getLeader() != null){
122                 getLeader().forward(message, getContext());
123             }
124         } else if (message.getClass().equals(RegisterChangeListener.SERIALIZABLE_CLASS)) {
125             registerChangeListener(RegisterChangeListener.fromSerializable(getContext().system(), message));
126         } else if (message instanceof UpdateSchemaContext) {
127             updateSchemaContext((UpdateSchemaContext) message);
128         } else if (message instanceof ForwardedCommitTransaction) {
129             handleForwardedCommit((ForwardedCommitTransaction) message);
130         } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
131             if(isLeader()) {
132                 createTransaction(CreateTransaction.fromSerializable(message));
133             } else if(getLeader() != null){
134                 getLeader().forward(message, getContext());
135             }
136         } else if (message instanceof PeerAddressResolved){
137             PeerAddressResolved resolved = (PeerAddressResolved) message;
138             setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
139         } else {
140           super.onReceiveCommand(message);
141         }
142     }
143
144    private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){
145       if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){
146         return getContext().actorOf(
147             ShardTransaction.props( store.newReadOnlyTransaction(), getSelf(), schemaContext), transactionId);
148
149       }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){
150         return getContext().actorOf(
151             ShardTransaction.props( store.newReadWriteTransaction(), getSelf(), schemaContext), transactionId);
152
153
154       }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){
155         return getContext().actorOf(
156             ShardTransaction.props( store.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId);
157       }else{
158         throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ;
159       }
160    }
161
162     private void createTransaction(CreateTransaction createTransaction) {
163
164         String transactionId = "shard-" + createTransaction.getTransactionId();
165         LOG.info("Creating transaction : {} " , transactionId);
166         ActorRef transactionActor = createTypedTransactionActor(createTransaction,transactionId);
167
168         getSender()
169             .tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId()).toSerializable(),
170                 getSelf());
171     }
172
173     private void commit(final ActorRef sender, Object serialized) {
174         Modification modification = MutableCompositeModification.fromSerializable(serialized, schemaContext);
175         DOMStoreThreePhaseCommitCohort cohort =
176             modificationToCohort.remove(serialized);
177         if (cohort == null) {
178             LOG.error(
179                 "Could not find cohort for modification : {}", modification);
180             LOG.info("Writing modification using a new transaction");
181             DOMStoreReadWriteTransaction transaction =
182                 store.newReadWriteTransaction();
183             modification.apply(transaction);
184             DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
185             ListenableFuture<Void> future =
186                 commitCohort.preCommit();
187             try {
188                 future.get();
189                 future = commitCohort.commit();
190                 future.get();
191             } catch (InterruptedException e) {
192                 LOG.error("Failed to commit", e);
193             } catch (ExecutionException e) {
194                 LOG.error("Failed to commit", e);
195             }
196         }
197
198         final ListenableFuture<Void> future = cohort.commit();
199         shardMBean.incrementCommittedTransactionCount();
200         final ActorRef self = getSelf();
201         future.addListener(new Runnable() {
202             @Override
203             public void run() {
204                 try {
205                     future.get();
206
207                     if(sender != null) {
208                         sender
209                             .tell(new CommitTransactionReply().toSerializable(),
210                                 self);
211                     } else {
212                         LOG.error("sender is null ???");
213                     }
214                 } catch (InterruptedException | ExecutionException e) {
215                     // FIXME : Handle this properly
216                     LOG.error(e, "An exception happened when committing");
217                 }
218             }
219         }, getContext().dispatcher());
220     }
221
222     private void handleForwardedCommit(ForwardedCommitTransaction message) {
223         Object serializedModification = message.getModification().toSerializable();
224
225         modificationToCohort
226             .put(serializedModification , message.getCohort());
227
228         if(persistent) {
229             this.persistData(getSender(), "identifier", new CompositeModificationPayload(serializedModification));
230         } else {
231             this.commit(getSender(), serializedModification);
232         }
233     }
234
235     private void updateSchemaContext(UpdateSchemaContext message) {
236         this.schemaContext = message.getSchemaContext();
237         store.onGlobalContextUpdated(message.getSchemaContext());
238     }
239
240     private void registerChangeListener(
241         RegisterChangeListener registerChangeListener) {
242
243         LOG.debug("registerDataChangeListener for " + registerChangeListener.getPath());
244
245
246         ActorSelection dataChangeListenerPath = getContext()
247             .system().actorSelection(
248                 registerChangeListener.getDataChangeListenerPath());
249
250
251         // Notify the listener if notifications should be enabled or not
252         // If this shard is the leader then it will enable notifications else
253         // it will not
254         dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf());
255
256         // Now store a reference to the data change listener so it can be notified
257         // at a later point if notifications should be enabled or disabled
258         dataChangeListeners.add(dataChangeListenerPath);
259
260         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
261             listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
262
263         org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
264             registration =
265             store.registerChangeListener(registerChangeListener.getPath(),
266                 listener, registerChangeListener.getScope());
267         ActorRef listenerRegistration =
268             getContext().actorOf(
269                 DataChangeListenerRegistration.props(registration));
270
271         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
272
273         getSender()
274             .tell(new RegisterChangeListenerReply(listenerRegistration.path()).toSerializable(),
275                 getSelf());
276     }
277
278     private void createTransactionChain() {
279         DOMStoreTransactionChain chain = store.createTransactionChain();
280         ActorRef transactionChain =
281             getContext().actorOf(
282                 ShardTransactionChain.props(chain, schemaContext));
283         getSender()
284             .tell(new CreateTransactionChainReply(transactionChain.path())
285                 .toSerializable(),
286                 getSelf());
287     }
288
289     @Override protected void applyState(ActorRef clientActor, String identifier,
290         Object data) {
291
292         if(data instanceof CompositeModificationPayload){
293             Object modification =
294                 ((CompositeModificationPayload) data).getModification();
295
296             if(modification != null){
297                 commit(clientActor, modification);
298             } else {
299                 LOG.error("modification is null - this is very unexpected");
300             }
301
302
303         } else {
304             LOG.error("Unknown state received {}", data);
305         }
306
307     }
308
309     @Override protected Object createSnapshot() {
310         throw new UnsupportedOperationException("createSnapshot");
311     }
312
313     @Override protected void applySnapshot(Object snapshot) {
314         throw new UnsupportedOperationException("applySnapshot");
315     }
316
317     @Override protected void onStateChanged() {
318         for(ActorSelection dataChangeListener : dataChangeListeners){
319             dataChangeListener.tell(new EnableNotification(isLeader()), getSelf());
320         }
321     }
322
323     @Override public String persistenceId() {
324         return this.name;
325     }
326 }