Expose more information about a Shard via JMX
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.serialization.Serialization;
18 import com.google.common.base.Optional;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.ListeningExecutorService;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
24 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
30 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
32 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
33 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
34 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
35 import org.opendaylight.controller.cluster.datastore.modification.Modification;
36 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
37 import org.opendaylight.controller.cluster.raft.ConfigParams;
38 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
39 import org.opendaylight.controller.cluster.raft.RaftActor;
40 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
41 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
42 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
43 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
44 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
45 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
49 import scala.concurrent.duration.FiniteDuration;
50
51 import java.util.ArrayList;
52 import java.util.HashMap;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.TimeUnit;
58
59 /**
60  * A Shard represents a portion of the logical data tree <br/>
61  * <p>
62  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
63  * </p>
64  */
65 public class Shard extends RaftActor {
66
67     private static final ConfigParams configParams = new ShardConfigParams();
68
69     public static final String DEFAULT_NAME = "default";
70
71     private final ListeningExecutorService storeExecutor =
72         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
73
74     private final InMemoryDOMDataStore store;
75
76     private final Map<Object, DOMStoreThreePhaseCommitCohort>
77         modificationToCohort = new HashMap<>();
78
79     private final LoggingAdapter LOG =
80         Logging.getLogger(getContext().system(), this);
81
82     // By default persistent will be true and can be turned off using the system
83     // property persistent
84     private final boolean persistent;
85
86     private final String name;
87
88     private volatile SchemaContext schemaContext;
89
90     private final ShardStats shardMBean;
91
92     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
93
94     private Shard(String name, Map<String, String> peerAddresses) {
95         super(name, peerAddresses, Optional.of(configParams));
96
97         this.name = name;
98
99         String setting = System.getProperty("shard.persistent");
100
101         this.persistent = !"false".equals(setting);
102
103         LOG.info("Creating shard : {} persistent : {}", name, persistent);
104
105         store = new InMemoryDOMDataStore(name, storeExecutor);
106
107         shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
108
109     }
110
111     public static Props props(final String name,
112         final Map<String, String> peerAddresses) {
113         return Props.create(new Creator<Shard>() {
114
115             @Override
116             public Shard create() throws Exception {
117                 return new Shard(name, peerAddresses);
118             }
119
120         });
121     }
122
123
124     @Override public void onReceiveCommand(Object message) {
125         LOG.debug("Received message {} from {}", message.getClass().toString(),
126             getSender());
127
128         if (message.getClass()
129             .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
130             if (isLeader()) {
131                 createTransactionChain();
132             } else if (getLeader() != null) {
133                 getLeader().forward(message, getContext());
134             }
135         } else if (message instanceof RegisterChangeListener) {
136             registerChangeListener((RegisterChangeListener) message);
137         } else if (message instanceof UpdateSchemaContext) {
138             updateSchemaContext((UpdateSchemaContext) message);
139         } else if (message instanceof ForwardedCommitTransaction) {
140             handleForwardedCommit((ForwardedCommitTransaction) message);
141         } else if (message.getClass()
142             .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
143             if (isLeader()) {
144                 createTransaction(CreateTransaction.fromSerializable(message));
145             } else if (getLeader() != null) {
146                 getLeader().forward(message, getContext());
147             }
148         } else if (message instanceof PeerAddressResolved) {
149             PeerAddressResolved resolved = (PeerAddressResolved) message;
150             setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
151         } else {
152             super.onReceiveCommand(message);
153         }
154     }
155
156     private ActorRef createTypedTransactionActor(
157         CreateTransaction createTransaction, String transactionId) {
158         if (createTransaction.getTransactionType()
159             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
160             shardMBean.incrementReadOnlyTransactionCount();
161             return getContext().actorOf(
162                 ShardTransaction
163                     .props(store.newReadOnlyTransaction(), getSelf(),
164                         schemaContext), transactionId);
165
166         } else if (createTransaction.getTransactionType()
167             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
168             shardMBean.incrementReadWriteTransactionCount();
169             return getContext().actorOf(
170                 ShardTransaction
171                     .props(store.newReadWriteTransaction(), getSelf(),
172                         schemaContext), transactionId);
173
174
175         } else if (createTransaction.getTransactionType()
176             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
177             shardMBean.incrementWriteOnlyTransactionCount();
178             return getContext().actorOf(
179                 ShardTransaction
180                     .props(store.newWriteOnlyTransaction(), getSelf(),
181                         schemaContext), transactionId);
182         } else {
183             throw new IllegalArgumentException(
184                 "CreateTransaction message has unidentified transaction type="
185                     + createTransaction.getTransactionType());
186         }
187     }
188
189     private void createTransaction(CreateTransaction createTransaction) {
190
191         String transactionId = "shard-" + createTransaction.getTransactionId();
192         LOG.info("Creating transaction : {} ", transactionId);
193         ActorRef transactionActor =
194             createTypedTransactionActor(createTransaction, transactionId);
195
196         getSender()
197             .tell(new CreateTransactionReply(
198                     Serialization.serializedActorPath(transactionActor),
199                     createTransaction.getTransactionId()).toSerializable(),
200                 getSelf()
201             );
202     }
203
204     private void commit(final ActorRef sender, Object serialized) {
205         Modification modification =
206             MutableCompositeModification.fromSerializable(
207                 serialized, schemaContext);
208         DOMStoreThreePhaseCommitCohort cohort =
209             modificationToCohort.remove(serialized);
210         if (cohort == null) {
211             LOG.error(
212                 "Could not find cohort for modification : {}", modification);
213             LOG.info("Writing modification using a new transaction");
214             DOMStoreReadWriteTransaction transaction =
215                 store.newReadWriteTransaction();
216             modification.apply(transaction);
217             DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
218             ListenableFuture<Void> future =
219                 commitCohort.preCommit();
220             try {
221                 future.get();
222                 future = commitCohort.commit();
223                 future.get();
224             } catch (InterruptedException e) {
225                 LOG.error("Failed to commit", e);
226             } catch (ExecutionException e) {
227                 LOG.error("Failed to commit", e);
228             }
229         }
230
231         final ListenableFuture<Void> future = cohort.commit();
232         shardMBean.incrementCommittedTransactionCount();
233         final ActorRef self = getSelf();
234         future.addListener(new Runnable() {
235             @Override
236             public void run() {
237                 try {
238                     future.get();
239
240                     if (sender != null) {
241                         sender
242                             .tell(new CommitTransactionReply().toSerializable(),
243                                 self);
244                     } else {
245                         LOG.error("sender is null ???");
246                     }
247                 } catch (InterruptedException | ExecutionException e) {
248                     // FIXME : Handle this properly
249                     LOG.error(e, "An exception happened when committing");
250                 }
251             }
252         }, getContext().dispatcher());
253     }
254
255     private void handleForwardedCommit(ForwardedCommitTransaction message) {
256         Object serializedModification =
257             message.getModification().toSerializable();
258
259         modificationToCohort
260             .put(serializedModification, message.getCohort());
261
262         if (persistent) {
263             this.persistData(getSender(), "identifier",
264                 new CompositeModificationPayload(serializedModification));
265         } else {
266             this.commit(getSender(), serializedModification);
267         }
268     }
269
270     private void updateSchemaContext(UpdateSchemaContext message) {
271         this.schemaContext = message.getSchemaContext();
272         store.onGlobalContextUpdated(message.getSchemaContext());
273     }
274
275     private void registerChangeListener(
276         RegisterChangeListener registerChangeListener) {
277
278         LOG.debug("registerDataChangeListener for " + registerChangeListener
279             .getPath());
280
281
282         ActorSelection dataChangeListenerPath = getContext()
283             .system().actorSelection(
284                 registerChangeListener.getDataChangeListenerPath());
285
286
287         // Notify the listener if notifications should be enabled or not
288         // If this shard is the leader then it will enable notifications else
289         // it will not
290         dataChangeListenerPath
291             .tell(new EnableNotification(isLeader()), getSelf());
292
293         // Now store a reference to the data change listener so it can be notified
294         // at a later point if notifications should be enabled or disabled
295         dataChangeListeners.add(dataChangeListenerPath);
296
297         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
298             listener =
299             new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
300
301         org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
302             registration =
303             store.registerChangeListener(registerChangeListener.getPath(),
304                 listener, registerChangeListener.getScope());
305         ActorRef listenerRegistration =
306             getContext().actorOf(
307                 DataChangeListenerRegistration.props(registration));
308
309         LOG.debug(
310             "registerDataChangeListener sending reply, listenerRegistrationPath = "
311                 + listenerRegistration.path().toString());
312
313         getSender()
314             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
315                 getSelf());
316     }
317
318     private void createTransactionChain() {
319         DOMStoreTransactionChain chain = store.createTransactionChain();
320         ActorRef transactionChain =
321             getContext().actorOf(
322                 ShardTransactionChain.props(chain, schemaContext));
323         getSender()
324             .tell(new CreateTransactionChainReply(transactionChain.path())
325                     .toSerializable(),
326                 getSelf()
327             );
328     }
329
330     @Override protected void applyState(ActorRef clientActor, String identifier,
331         Object data) {
332
333         if (data instanceof CompositeModificationPayload) {
334             Object modification =
335                 ((CompositeModificationPayload) data).getModification();
336
337             if (modification != null) {
338                 commit(clientActor, modification);
339             } else {
340                 LOG.error("modification is null - this is very unexpected");
341             }
342
343
344         } else {
345             LOG.error("Unknown state received {}", data);
346         }
347
348         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
349
350         if(lastLogEntry != null){
351             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
352             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
353         }
354
355         shardMBean.setCommitIndex(getCommitIndex());
356         shardMBean.setLastApplied(getLastApplied());
357
358     }
359
360     @Override protected Object createSnapshot() {
361         throw new UnsupportedOperationException("createSnapshot");
362     }
363
364     @Override protected void applySnapshot(Object snapshot) {
365         throw new UnsupportedOperationException("applySnapshot");
366     }
367
368     @Override protected void onStateChanged() {
369         for (ActorSelection dataChangeListener : dataChangeListeners) {
370             dataChangeListener
371                 .tell(new EnableNotification(isLeader()), getSelf());
372         }
373
374         if (getLeaderId() != null) {
375             shardMBean.setLeader(getLeaderId());
376         }
377
378         shardMBean.setRaftState(getRaftState().name());
379         shardMBean.setCurrentTerm(getCurrentTerm());
380     }
381
382     @Override public String persistenceId() {
383         return this.name;
384     }
385
386
387     private static class ShardConfigParams extends DefaultConfigParamsImpl {
388         public static final FiniteDuration HEART_BEAT_INTERVAL =
389             new FiniteDuration(500, TimeUnit.MILLISECONDS);
390
391         @Override public FiniteDuration getHeartBeatInterval() {
392             return HEART_BEAT_INTERVAL;
393         }
394     }
395 }