Merge "Created Sample Feature Test Class for Base Feature Repository"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.serialization.Serialization;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
30 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
31 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
32 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
33 import org.opendaylight.controller.cluster.datastore.modification.Modification;
34 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
35 import org.opendaylight.controller.cluster.raft.RaftActor;
36 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
37 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44
45 import java.util.HashMap;
46 import java.util.Map;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.Executors;
49
50 /**
51  * A Shard represents a portion of the logical data tree <br/>
52  * <p>
53  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
54  * </p>
55  */
56 public class Shard extends RaftActor {
57
58     public static final String DEFAULT_NAME = "default";
59
60     private final ListeningExecutorService storeExecutor =
61         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
62
63     private final InMemoryDOMDataStore store;
64
65     private final Map<Object, DOMStoreThreePhaseCommitCohort>
66         modificationToCohort = new HashMap<>();
67
68     private final LoggingAdapter LOG =
69         Logging.getLogger(getContext().system(), this);
70
71     // By default persistent will be true and can be turned off using the system
72     // property persistent
73     private final boolean persistent;
74
75     private final String name;
76
77     private SchemaContext schemaContext;
78
79     private final ShardStats shardMBean;
80
81     private Shard(String name, Map<String, String> peerAddresses) {
82         super(name, peerAddresses);
83
84         this.name = name;
85
86         String setting = System.getProperty("shard.persistent");
87
88         this.persistent = !"false".equals(setting);
89
90         LOG.info("Creating shard : {} persistent : {}", name, persistent);
91
92         store = new InMemoryDOMDataStore(name, storeExecutor);
93
94         shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
95
96     }
97
98     public static Props props(final String name, final Map<String, String> peerAddresses) {
99         return Props.create(new Creator<Shard>() {
100
101             @Override
102             public Shard create() throws Exception {
103                 return new Shard(name, peerAddresses);
104             }
105
106         });
107     }
108
109
110     @Override public void onReceiveCommand(Object message){
111         LOG.debug("Received message {} from {}", message.getClass().toString(), getSender());
112
113         if (message.getClass().equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
114             if(isLeader()) {
115                 createTransactionChain();
116             } else if(getLeader() != null){
117                 getLeader().forward(message, getContext());
118             }
119         } else if (message.getClass().equals(RegisterChangeListener.SERIALIZABLE_CLASS)) {
120             registerChangeListener(RegisterChangeListener.fromSerializable(getContext().system(), message));
121         } else if (message instanceof UpdateSchemaContext) {
122             updateSchemaContext((UpdateSchemaContext) message);
123         } else if (message instanceof ForwardedCommitTransaction) {
124             handleForwardedCommit((ForwardedCommitTransaction) message);
125         } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
126             if(isLeader()) {
127                 createTransaction(CreateTransaction.fromSerializable(message));
128             } else if(getLeader() != null){
129                 getLeader().forward(message, getContext());
130             }
131         } else if (message instanceof PeerAddressResolved){
132             PeerAddressResolved resolved = (PeerAddressResolved) message;
133             setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
134         } else {
135           super.onReceiveCommand(message);
136         }
137     }
138
139     private void createTransaction(CreateTransaction createTransaction) {
140         DOMStoreReadWriteTransaction transaction =
141             store.newReadWriteTransaction();
142         String transactionId = "shard-" + createTransaction.getTransactionId();
143         LOG.info("Creating transaction : {} " , transactionId);
144         ActorRef transactionActor = getContext().actorOf(
145             ShardTransaction.props(transaction, getSelf(), schemaContext), transactionId);
146
147         getSender()
148             .tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId()).toSerializable(),
149                 getSelf());
150     }
151
152     private void commit(final ActorRef sender, Object serialized) {
153         Modification modification = MutableCompositeModification.fromSerializable(serialized, schemaContext);
154         DOMStoreThreePhaseCommitCohort cohort =
155             modificationToCohort.remove(serialized);
156         if (cohort == null) {
157             LOG.error(
158                 "Could not find cohort for modification : " + modification);
159             LOG.info("Writing modification using a new transaction");
160             modification.apply(store.newReadWriteTransaction());
161             return;
162         }
163
164         final ListenableFuture<Void> future = cohort.commit();
165         shardMBean.incrementCommittedTransactionCount();
166         final ActorRef self = getSelf();
167         future.addListener(new Runnable() {
168             @Override
169             public void run() {
170                 try {
171                     future.get();
172
173                     if(sender != null) {
174                         sender
175                             .tell(new CommitTransactionReply().toSerializable(),
176                                 self);
177                     } else {
178                         LOG.error("sender is null ???");
179                     }
180                 } catch (InterruptedException | ExecutionException e) {
181                     // FIXME : Handle this properly
182                     LOG.error(e, "An exception happened when committing");
183                 }
184             }
185         }, getContext().dispatcher());
186     }
187
188     private void handleForwardedCommit(ForwardedCommitTransaction message) {
189         Object serializedModification = message.getModification().toSerializable();
190
191         modificationToCohort
192             .put(serializedModification , message.getCohort());
193
194         if(persistent) {
195             this.persistData(getSender(), "identifier", new CompositeModificationPayload(serializedModification));
196         } else {
197             this.commit(getSender(), serializedModification);
198         }
199     }
200
201     private void updateSchemaContext(UpdateSchemaContext message) {
202         this.schemaContext = message.getSchemaContext();
203         store.onGlobalContextUpdated(message.getSchemaContext());
204     }
205
206     private void registerChangeListener(
207         RegisterChangeListener registerChangeListener) {
208
209         LOG.debug("registerDataChangeListener for " + registerChangeListener.getPath());
210
211
212         ActorSelection dataChangeListenerPath = getContext()
213             .system().actorSelection(
214                 registerChangeListener.getDataChangeListenerPath());
215
216         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
217             listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
218
219         org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
220             registration =
221             store.registerChangeListener(registerChangeListener.getPath(),
222                 listener, registerChangeListener.getScope());
223         ActorRef listenerRegistration =
224             getContext().actorOf(
225                 DataChangeListenerRegistration.props(registration));
226
227         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
228
229         getSender()
230             .tell(new RegisterChangeListenerReply(listenerRegistration.path()).toSerializable(),
231                 getSelf());
232     }
233
234     private void createTransactionChain() {
235         DOMStoreTransactionChain chain = store.createTransactionChain();
236         ActorRef transactionChain =
237             getContext().actorOf(
238                 ShardTransactionChain.props(chain, schemaContext));
239         getSender()
240             .tell(new CreateTransactionChainReply(transactionChain.path())
241                 .toSerializable(),
242                 getSelf());
243     }
244
245     @Override protected void applyState(ActorRef clientActor, String identifier,
246         Object data) {
247
248         if(data instanceof CompositeModificationPayload){
249             Object modification =
250                 ((CompositeModificationPayload) data).getModification();
251             commit(clientActor, modification);
252         } else {
253             LOG.error("Unknown state received {}", data);
254         }
255
256     }
257
258     @Override protected Object createSnapshot() {
259         throw new UnsupportedOperationException("createSnapshot");
260     }
261
262     @Override protected void applySnapshot(Object snapshot) {
263         throw new UnsupportedOperationException("applySnapshot");
264     }
265
266     @Override public String persistenceId() {
267         return this.name;
268     }
269 }