2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.persistence.Persistent;
18 import akka.persistence.RecoveryCompleted;
19 import akka.persistence.UntypedProcessor;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.ListeningExecutorService;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.NonPersistent;
32 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
33 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
34 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
35 import org.opendaylight.controller.cluster.datastore.modification.Modification;
36 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
37 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
38 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46 import java.util.HashMap;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.Executors;
52 * A Shard represents a portion of the logical data tree <br/>
54 * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
57 public class Shard extends UntypedProcessor {
59 public static final String DEFAULT_NAME = "default";
61 private final ListeningExecutorService storeExecutor =
62 MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
64 private final InMemoryDOMDataStore store;
66 private final Map<Object, DOMStoreThreePhaseCommitCohort>
67 modificationToCohort = new HashMap<>();
69 private final LoggingAdapter LOG =
70 Logging.getLogger(getContext().system(), this);
72 // By default persistent will be true and can be turned off using the system
73 // property persistent
74 private final boolean persistent;
76 private SchemaContext schemaContext;
78 private final ShardStats shardMBean;
80 private Shard(String name) {
82 String setting = System.getProperty("shard.persistent");
84 this.persistent = !"false".equals(setting);
86 LOG.info("Creating shard : {} persistent : {}", name, persistent);
88 store = new InMemoryDOMDataStore(name, storeExecutor);
90 shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
94 public static Props props(final String name) {
95 return Props.create(new Creator<Shard>() {
98 public Shard create() throws Exception {
99 return new Shard(name);
107 public void onReceive(Object message) throws Exception {
108 LOG.debug("Received message " + message.getClass().toString());
110 if(!recoveryFinished()){
111 // FIXME : Properly handle recovery
115 if (message.getClass().equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
116 createTransactionChain();
117 } else if (message.getClass().equals(RegisterChangeListener.SERIALIZABLE_CLASS)) {
118 registerChangeListener(RegisterChangeListener.fromSerializable(getContext().system(), message));
119 } else if (message instanceof UpdateSchemaContext) {
120 updateSchemaContext((UpdateSchemaContext) message);
121 } else if (message instanceof ForwardedCommitTransaction) {
122 handleForwardedCommit((ForwardedCommitTransaction) message);
123 } else if (message instanceof Persistent) {
124 commit(((Persistent)message).payload());
125 } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
126 createTransaction(CreateTransaction.fromSerializable(message));
127 } else if(message instanceof NonPersistent){
128 commit(((NonPersistent)message).payload());
129 }else if (message instanceof RecoveryCompleted) {
130 //FIXME: PROPERLY HANDLE RECOVERY COMPLETED
133 throw new Exception("Not recognized message found message=" + message);
137 private void createTransaction(CreateTransaction createTransaction) {
138 DOMStoreReadWriteTransaction transaction =
139 store.newReadWriteTransaction();
140 ActorRef transactionActor = getContext().actorOf(
141 ShardTransaction.props(transaction, getSelf(), schemaContext), "shard-" + createTransaction.getTransactionId());
143 .tell(new CreateTransactionReply(transactionActor.path().toString(), createTransaction.getTransactionId()).toSerializable(),
147 private void commit(Object serialized) {
148 Modification modification = MutableCompositeModification.fromSerializable(serialized, schemaContext);
149 DOMStoreThreePhaseCommitCohort cohort =
150 modificationToCohort.remove(serialized);
151 if (cohort == null) {
153 "Could not find cohort for modification : " + modification);
156 final ListenableFuture<Void> future = cohort.commit();
157 shardMBean.incrementCommittedTransactionCount();
158 final ActorRef sender = getSender();
159 final ActorRef self = getSelf();
160 future.addListener(new Runnable() {
165 sender.tell(new CommitTransactionReply().toSerializable(), self);
166 } catch (InterruptedException | ExecutionException e) {
167 // FIXME : Handle this properly
168 LOG.error(e, "An exception happened when committing");
171 }, getContext().dispatcher());
174 private void handleForwardedCommit(ForwardedCommitTransaction message) {
175 Object serializedModification = message.getModification().toSerializable();
178 .put(serializedModification , message.getCohort());
180 getSelf().forward(Persistent.create(serializedModification),
183 getSelf().forward(NonPersistent.create(serializedModification),
188 private void updateSchemaContext(UpdateSchemaContext message) {
189 this.schemaContext = message.getSchemaContext();
190 store.onGlobalContextUpdated(message.getSchemaContext());
193 private void registerChangeListener(
194 RegisterChangeListener registerChangeListener) {
196 LOG.debug("registerDataChangeListener for " + registerChangeListener.getPath());
199 ActorSelection dataChangeListenerPath = getContext()
200 .system().actorSelection(registerChangeListener.getDataChangeListenerPath());
202 AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
203 listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
205 org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
207 store.registerChangeListener(registerChangeListener.getPath(),
208 listener, registerChangeListener.getScope());
209 ActorRef listenerRegistration =
210 getContext().actorOf(
211 DataChangeListenerRegistration.props(registration));
213 LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
216 .tell(new RegisterChangeListenerReply(listenerRegistration.path()).toSerializable(),
220 private void createTransactionChain() {
221 DOMStoreTransactionChain chain = store.createTransactionChain();
222 ActorRef transactionChain =
223 getContext().actorOf(ShardTransactionChain.props(chain, schemaContext));
225 .tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),