Adjust Tx rate limiter for unused transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DistributedDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSystem;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import java.util.concurrent.CountDownLatch;
15 import java.util.concurrent.TimeUnit;
16 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
17 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
18 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
19 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
20 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
21 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  *
39  */
40 public class DistributedDataStore implements DOMStore, SchemaContextListener,
41         DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
42
43     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
44     private static final String UNKNOWN_TYPE = "unknown";
45
46     private static final long READY_WAIT_FACTOR = 3;
47
48     private final ActorContext actorContext;
49     private final long waitTillReadyTimeInMillis;
50
51
52     private AutoCloseable closeable;
53
54     private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
55
56     private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
57
58     private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
59
60     private final String type;
61
62     public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
63             Configuration configuration, DatastoreContext datastoreContext) {
64         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
65         Preconditions.checkNotNull(cluster, "cluster should not be null");
66         Preconditions.checkNotNull(configuration, "configuration should not be null");
67         Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
68
69         this.type = datastoreContext.getDataStoreType();
70
71         String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
72
73         LOG.info("Creating ShardManager : {}", shardManagerId);
74
75         String shardDispatcher =
76                 new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
77
78         actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
79                 ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch)
80                         .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ),
81                 cluster, configuration, datastoreContext);
82
83         this.waitTillReadyTimeInMillis =
84                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
85
86
87         datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType());
88         datastoreConfigMXBean.setContext(datastoreContext);
89         datastoreConfigMXBean.registerMBean();
90
91         datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContext.getDataStoreMXBeanType(), actorContext);
92         datastoreInfoMXBean.registerMBean();
93     }
94
95     public DistributedDataStore(ActorContext actorContext) {
96         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
97         this.type = UNKNOWN_TYPE;
98         this.waitTillReadyTimeInMillis =
99                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
100
101     }
102
103     public void setCloseable(AutoCloseable closeable) {
104         this.closeable = closeable;
105     }
106
107     @SuppressWarnings("unchecked")
108     @Override
109     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
110                                               ListenerRegistration<L> registerChangeListener(
111         final YangInstanceIdentifier path, L listener,
112         AsyncDataBroker.DataChangeScope scope) {
113
114         Preconditions.checkNotNull(path, "path should not be null");
115         Preconditions.checkNotNull(listener, "listener should not be null");
116
117         LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
118
119         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
120
121         final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
122                 new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
123         listenerRegistrationProxy.init(path, scope);
124
125         return listenerRegistrationProxy;
126     }
127
128     @Override
129     public DOMStoreTransactionChain createTransactionChain() {
130         return new TransactionChainProxy(actorContext);
131     }
132
133     @Override
134     public DOMStoreReadTransaction newReadOnlyTransaction() {
135         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
136     }
137
138     @Override
139     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
140         actorContext.acquireTxCreationPermit();
141         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
142     }
143
144     @Override
145     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
146         actorContext.acquireTxCreationPermit();
147         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
148     }
149
150     @Override
151     public void onGlobalContextUpdated(SchemaContext schemaContext) {
152         actorContext.setSchemaContext(schemaContext);
153     }
154
155     @Override
156     public void onDatastoreContextUpdated(DatastoreContext context) {
157         LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreType());
158
159         actorContext.setDatastoreContext(context);
160         datastoreConfigMXBean.setContext(context);
161     }
162
163     @Override
164     public void close() {
165         datastoreConfigMXBean.unregisterMBean();
166         datastoreInfoMXBean.unregisterMBean();
167
168         if(closeable != null) {
169             try {
170                 closeable.close();
171             } catch (Exception e) {
172                 LOG.debug("Error closing insance", e);
173             }
174         }
175
176         actorContext.shutdown();
177     }
178
179     @VisibleForTesting
180     ActorContext getActorContext() {
181         return actorContext;
182     }
183
184     public void waitTillReady(){
185         LOG.info("Beginning to wait for data store to become ready : {}", type);
186
187         try {
188             if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
189                 LOG.debug("Data store {} is now ready", type);
190             } else {
191                 LOG.error("Shared leaders failed to settle in {} seconds, giving up", TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
192             }
193         } catch (InterruptedException e) {
194             LOG.error("Interrupted while waiting for shards to settle", e);
195         }
196     }
197
198     @VisibleForTesting
199     public CountDownLatch getWaitTillReadyCountDownLatch() {
200         return waitTillReadyCountDownLatch;
201     }
202 }