Bug 1446: Add JMX stats for clustered data store
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.persistence.RecoveryFailure;
18 import akka.serialization.Serialization;
19
20 import com.google.common.base.Optional;
21 import com.google.common.base.Preconditions;
22 import com.google.common.util.concurrent.FutureCallback;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.protobuf.ByteString;
26 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
29 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
33 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
36 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
38 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
39 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
40 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
41 import org.opendaylight.controller.cluster.datastore.modification.Modification;
42 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
43 import org.opendaylight.controller.cluster.raft.ConfigParams;
44 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
45 import org.opendaylight.controller.cluster.raft.RaftActor;
46 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
47 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
48 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
49 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
53 import org.opendaylight.yangtools.concepts.ListenerRegistration;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57
58 import scala.concurrent.duration.FiniteDuration;
59
60 import java.util.ArrayList;
61 import java.util.HashMap;
62 import java.util.List;
63 import java.util.Map;
64 import java.util.concurrent.ExecutionException;
65 import java.util.concurrent.TimeUnit;
66
67 /**
68  * A Shard represents a portion of the logical data tree <br/>
69  * <p>
70  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
71  * </p>
72  */
73 public class Shard extends RaftActor {
74
75     private static final ConfigParams configParams = new ShardConfigParams();
76
77     public static final String DEFAULT_NAME = "default";
78
79     // The state of this Shard
80     private final InMemoryDOMDataStore store;
81
82     private final Map<Object, DOMStoreThreePhaseCommitCohort>
83         modificationToCohort = new HashMap<>();
84
85     private final LoggingAdapter LOG =
86         Logging.getLogger(getContext().system(), this);
87
88     // By default persistent will be true and can be turned off using the system
89     // property shard.persistent
90     private final boolean persistent;
91
92     /// The name of this shard
93     private final ShardIdentifier name;
94
95     private final ShardStats shardMBean;
96
97     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
98
99     private final DatastoreContext datastoreContext;
100
101
102     private SchemaContext schemaContext;
103
104     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
105             DatastoreContext datastoreContext) {
106         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
107
108         this.name = name;
109         this.datastoreContext = datastoreContext;
110
111         String setting = System.getProperty("shard.persistent");
112
113         this.persistent = !"false".equals(setting);
114
115         LOG.info("Shard created : {} persistent : {}", name, persistent);
116
117         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
118                 datastoreContext.getDataStoreProperties());
119
120         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
121                 datastoreContext.getDataStoreMXBeanType());
122         shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
123         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
124
125     }
126
127     private static Map<String, String> mapPeerAddresses(
128         Map<ShardIdentifier, String> peerAddresses) {
129         Map<String, String> map = new HashMap<>();
130
131         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
132             .entrySet()) {
133             map.put(entry.getKey().toString(), entry.getValue());
134         }
135
136         return map;
137     }
138
139     public static Props props(final ShardIdentifier name,
140         final Map<ShardIdentifier, String> peerAddresses,
141         DatastoreContext datastoreContext) {
142         Preconditions.checkNotNull(name, "name should not be null");
143         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
144         Preconditions.checkNotNull(datastoreContext, "shardContext should not be null");
145
146         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
147     }
148
149     @Override public void onReceiveRecover(Object message) {
150         LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
151             getSender());
152
153         if (message instanceof RecoveryFailure){
154             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
155         } else {
156             super.onReceiveRecover(message);
157         }
158     }
159
160     @Override public void onReceiveCommand(Object message) {
161         LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
162             getSender());
163
164         if (message.getClass()
165             .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
166             if (isLeader()) {
167                 createTransactionChain();
168             } else if (getLeader() != null) {
169                 getLeader().forward(message, getContext());
170             }
171         } else if (message instanceof RegisterChangeListener) {
172             registerChangeListener((RegisterChangeListener) message);
173         } else if (message instanceof UpdateSchemaContext) {
174             updateSchemaContext((UpdateSchemaContext) message);
175         } else if (message instanceof ForwardedCommitTransaction) {
176             handleForwardedCommit((ForwardedCommitTransaction) message);
177         } else if (message.getClass()
178             .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
179             if (isLeader()) {
180                 createTransaction(CreateTransaction.fromSerializable(message));
181             } else if (getLeader() != null) {
182                 getLeader().forward(message, getContext());
183             }
184         } else if (message instanceof PeerAddressResolved) {
185             PeerAddressResolved resolved = (PeerAddressResolved) message;
186             setPeerAddress(resolved.getPeerId().toString(),
187                 resolved.getPeerAddress());
188         } else {
189             super.onReceiveCommand(message);
190         }
191     }
192
193     private ActorRef createTypedTransactionActor(
194         CreateTransaction createTransaction,
195         ShardTransactionIdentifier transactionId) {
196         if (createTransaction.getTransactionType()
197             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
198
199             shardMBean.incrementReadOnlyTransactionCount();
200
201             return getContext().actorOf(
202                 ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
203                         schemaContext,datastoreContext, shardMBean), transactionId.toString());
204
205         } else if (createTransaction.getTransactionType()
206             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
207
208             shardMBean.incrementReadWriteTransactionCount();
209
210             return getContext().actorOf(
211                 ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
212                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
213
214
215         } else if (createTransaction.getTransactionType()
216             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
217
218             shardMBean.incrementWriteOnlyTransactionCount();
219
220             return getContext().actorOf(
221                 ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
222                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
223         } else {
224             throw new IllegalArgumentException(
225                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
226                     + createTransaction.getTransactionType());
227         }
228     }
229
230     private void createTransaction(CreateTransaction createTransaction) {
231
232         ShardTransactionIdentifier transactionId =
233             ShardTransactionIdentifier.builder()
234                 .remoteTransactionId(createTransaction.getTransactionId())
235                 .build();
236         LOG.debug("Creating transaction : {} ", transactionId);
237         ActorRef transactionActor =
238             createTypedTransactionActor(createTransaction, transactionId);
239
240         getSender()
241             .tell(new CreateTransactionReply(
242                     Serialization.serializedActorPath(transactionActor),
243                     createTransaction.getTransactionId()).toSerializable(),
244                 getSelf()
245             );
246     }
247
248     private void commit(final ActorRef sender, Object serialized) {
249         Modification modification = MutableCompositeModification
250             .fromSerializable(serialized, schemaContext);
251         DOMStoreThreePhaseCommitCohort cohort =
252             modificationToCohort.remove(serialized);
253         if (cohort == null) {
254             LOG.debug(
255                 "Could not find cohort for modification : {}. Writing modification using a new transaction",
256                 modification);
257             DOMStoreReadWriteTransaction transaction =
258                 store.newReadWriteTransaction();
259             modification.apply(transaction);
260             DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
261             ListenableFuture<Void> future =
262                 commitCohort.preCommit();
263             try {
264                 future.get();
265                 future = commitCohort.commit();
266                 future.get();
267             } catch (InterruptedException | ExecutionException e) {
268                 shardMBean.incrementFailedTransactionsCount();
269                 LOG.error("Failed to commit", e);
270                 return;
271             }
272             //we want to just apply the recovery commit and return
273             shardMBean.incrementCommittedTransactionCount();
274             return;
275         }
276
277         final ListenableFuture<Void> future = cohort.commit();
278         final ActorRef self = getSelf();
279
280         Futures.addCallback(future, new FutureCallback<Void>() {
281             @Override
282             public void onSuccess(Void v) {
283                 sender.tell(new CommitTransactionReply().toSerializable(), self);
284                 shardMBean.incrementCommittedTransactionCount();
285                 shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
286             }
287
288             @Override
289             public void onFailure(Throwable t) {
290                 LOG.error(t, "An exception happened during commit");
291                 shardMBean.incrementFailedTransactionsCount();
292                 sender.tell(new akka.actor.Status.Failure(t), self);
293             }
294         });
295
296     }
297
298     private void handleForwardedCommit(ForwardedCommitTransaction message) {
299         Object serializedModification =
300             message.getModification().toSerializable();
301
302         modificationToCohort
303             .put(serializedModification, message.getCohort());
304
305         if (persistent) {
306             this.persistData(getSender(), "identifier",
307                 new CompositeModificationPayload(serializedModification));
308         } else {
309             this.commit(getSender(), serializedModification);
310         }
311     }
312
313     private void updateSchemaContext(UpdateSchemaContext message) {
314         this.schemaContext = message.getSchemaContext();
315         store.onGlobalContextUpdated(message.getSchemaContext());
316     }
317
318     private void registerChangeListener(
319         RegisterChangeListener registerChangeListener) {
320
321         LOG.debug("registerDataChangeListener for {}", registerChangeListener
322             .getPath());
323
324
325         ActorSelection dataChangeListenerPath = getContext()
326             .system().actorSelection(
327                 registerChangeListener.getDataChangeListenerPath());
328
329
330         // Notify the listener if notifications should be enabled or not
331         // If this shard is the leader then it will enable notifications else
332         // it will not
333         dataChangeListenerPath
334             .tell(new EnableNotification(isLeader()), getSelf());
335
336         // Now store a reference to the data change listener so it can be notified
337         // at a later point if notifications should be enabled or disabled
338         dataChangeListeners.add(dataChangeListenerPath);
339
340         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
341             listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
342
343         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
344             registration = store.registerChangeListener(registerChangeListener.getPath(),
345                 listener, registerChangeListener.getScope());
346         ActorRef listenerRegistration =
347             getContext().actorOf(
348                 DataChangeListenerRegistration.props(registration));
349
350         LOG.debug(
351             "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
352             , listenerRegistration.path().toString());
353
354         getSender()
355             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
356                 getSelf());
357     }
358
359     private void createTransactionChain() {
360         DOMStoreTransactionChain chain = store.createTransactionChain();
361         ActorRef transactionChain = getContext().actorOf(
362                 ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
363         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
364                 getSelf());
365     }
366
367     @Override protected void applyState(ActorRef clientActor, String identifier,
368         Object data) {
369
370         if (data instanceof CompositeModificationPayload) {
371             Object modification =
372                 ((CompositeModificationPayload) data).getModification();
373
374             if (modification != null) {
375                 commit(clientActor, modification);
376             } else {
377                 LOG.error(
378                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
379                     identifier, clientActor.path().toString());
380             }
381
382         } else {
383             LOG.error("Unknown state received {}", data);
384         }
385
386         // Update stats
387         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
388
389         if (lastLogEntry != null) {
390             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
391             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
392         }
393
394         shardMBean.setCommitIndex(getCommitIndex());
395         shardMBean.setLastApplied(getLastApplied());
396
397     }
398
399     @Override protected void createSnapshot() {
400         throw new UnsupportedOperationException("createSnapshot");
401     }
402
403     @Override protected void applySnapshot(ByteString snapshot) {
404         throw new UnsupportedOperationException("applySnapshot");
405     }
406
407     @Override protected void onStateChanged() {
408         for (ActorSelection dataChangeListener : dataChangeListeners) {
409             dataChangeListener
410                 .tell(new EnableNotification(isLeader()), getSelf());
411         }
412
413         if (getLeaderId() != null) {
414             shardMBean.setLeader(getLeaderId());
415         }
416
417         shardMBean.setRaftState(getRaftState().name());
418         shardMBean.setCurrentTerm(getCurrentTerm());
419     }
420
421     @Override public String persistenceId() {
422         return this.name.toString();
423     }
424
425
426     private static class ShardConfigParams extends DefaultConfigParamsImpl {
427         public static final FiniteDuration HEART_BEAT_INTERVAL =
428             new FiniteDuration(500, TimeUnit.MILLISECONDS);
429
430         @Override public FiniteDuration getHeartBeatInterval() {
431             return HEART_BEAT_INTERVAL;
432         }
433     }
434
435     private static class ShardCreator implements Creator<Shard> {
436
437         private static final long serialVersionUID = 1L;
438
439         final ShardIdentifier name;
440         final Map<ShardIdentifier, String> peerAddresses;
441         final DatastoreContext datastoreContext;
442
443         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
444                 DatastoreContext datastoreContext) {
445             this.name = name;
446             this.peerAddresses = peerAddresses;
447             this.datastoreContext = datastoreContext;
448         }
449
450         @Override
451         public Shard create() throws Exception {
452             return new Shard(name, peerAddresses, datastoreContext);
453         }
454     }
455 }