7 Clustering is a mechanism that enables multiple processes and programs to work
8 together as one entity. For example, when you search for something on
9 google.com, it may seem like your search request is processed by only one web
10 server. In reality, your search request is processed by may web servers
11 connected in a cluster. Similarly, you can have multiple instances of
12 OpenDaylight working together as one entity.
14 Advantages of clustering are:
16 * Scaling: If you have multiple instances of OpenDaylight running, you can
17 potentially do more work and store more data than you could with only one
18 instance. You can also break up your data into smaller chunks (shards) and
19 either distribute that data across the cluster or perform certain operations
20 on certain members of the cluster.
21 * High Availability: If you have multiple instances of OpenDaylight running and
22 one of them crashes, you will still have the other instances working and
24 * Data Persistence: You will not lose any data stored in OpenDaylight after a
25 manual restart or a crash.
27 The following sections describe how to set up clustering on both individual and
28 multiple OpenDaylight instances.
30 Multiple Node Clustering
31 ------------------------
33 The following sections describe how to set up multiple node clusters in OpenDaylight.
35 Deployment Considerations
36 ^^^^^^^^^^^^^^^^^^^^^^^^^
38 To implement clustering, the deployment considerations are as follows:
40 * To set up a cluster with multiple nodes, we recommend that you use a minimum
41 of three machines. You can set up a cluster with just two nodes. However, if
42 one of the two nodes fail, the cluster will not be operational.
44 .. note:: This is because clustering in OpenDaylight requires a majority of the
45 nodes to be up and one node cannot be a majority of two nodes.
47 * Every device that belongs to a cluster needs to have an identifier.
48 OpenDaylight uses the node's ``role`` for this purpose. After you define the
49 first node's role as *member-1* in the ``akka.conf`` file, OpenDaylight uses
50 *member-1* to identify that node.
52 * Data shards are used to contain all or a certain segment of a OpenDaylight's
53 MD-SAL datastore. For example, one shard can contain all the inventory data
54 while another shard contains all of the topology data.
56 If you do not specify a module in the ``modules.conf`` file and do not specify
57 a shard in ``module-shards.conf``, then (by default) all the data is placed in
58 the default shard (which must also be defined in ``module-shards.conf`` file).
59 Each shard has replicas configured. You can specify the details of where the
60 replicas reside in ``module-shards.conf`` file.
62 * If you have a three node cluster and would like to be able to tolerate any
63 single node crashing, a replica of every defined data shard must be running
64 on all three cluster nodes.
66 .. note:: This is because OpenDaylight's clustering implementation requires a
67 majority of the defined shard replicas to be running in order to
68 function. If you define data shard replicas on two of the cluster nodes
69 and one of those nodes goes down, the corresponding data shards will not
72 * If you have a three node cluster and have defined replicas for a data shard
73 on each of those nodes, that shard will still function even if only two of
74 the cluster nodes are running. Note that if one of those remaining two nodes
75 goes down, the shard will not be operational.
77 * It is recommended that you have multiple seed nodes configured. After a
78 cluster member is started, it sends a message to all of its seed nodes.
79 The cluster member then sends a join command to the first seed node that
80 responds. If none of its seed nodes reply, the cluster member repeats this
81 process until it successfully establishes a connection or it is shut down.
83 * After a node is unreachable, it remains down for configurable period of time
84 (10 seconds, by default). Once a node goes down, you need to restart it so
85 that it can rejoin the cluster. Once a restarted node joins a cluster, it
86 will synchronize with the lead node automatically.
88 .. _getting-started-clustering-scripts:
93 OpenDaylight includes some scripts to help with the clustering configuration.
97 Scripts are stored in the OpenDaylight distribution/bin folder, and
98 maintained in the distribution project
99 `repository <https://git.opendaylight.org/gerrit/p/integration/distribution>`_
100 in the folder distribution-karaf/src/main/assembly/bin/.
102 Configure Cluster Script
103 ^^^^^^^^^^^^^^^^^^^^^^^^
105 This script is used to configure the cluster parameters (e.g. akka.conf,
106 module-shards.conf) on a member of the controller cluster. The user should
107 restart the node to apply the changes.
111 The script can be used at any time, even before the controller is started
116 bin/configure_cluster.sh <index> <seed_nodes_list>
118 * index: Integer within 1..N, where N is the number of seed nodes. This indicates
119 which controller node (1..N) is configured by the script.
120 * seed_nodes_list: List of seed nodes (IP address), separated by comma or space.
122 The IP address at the provided index should belong to the member executing
123 the script. When running this script on multiple seed nodes, keep the
124 seed_node_list the same, and vary the index from 1 through N.
126 Optionally, shards can be configured in a more granular way by modifying the
127 file "custom_shard_configs.txt" in the same folder as this tool. Please see
128 that file for more details.
132 bin/configure_cluster.sh 2 192.168.0.1 192.168.0.2 192.168.0.3
134 The above command will configure the member 2 (IP address 192.168.0.2) of a
135 cluster made of 192.168.0.1 192.168.0.2 192.168.0.3.
137 Setting Up a Multiple Node Cluster
138 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
140 To run OpenDaylight in a three node cluster, perform the following:
142 First, determine the three machines that will make up the cluster. After that,
143 do the following on each machine:
145 #. Copy the OpenDaylight distribution zip file to the machine.
146 #. Unzip the distribution.
147 #. Open the following .conf files:
149 * configuration/initial/akka.conf
150 * configuration/initial/module-shards.conf
152 #. In each configuration file, make the following changes:
154 Find every instance of the following lines and replace _127.0.0.1_ with the
155 hostname or IP address of the machine on which this file resides and
156 OpenDaylight will run::
159 hostname = "127.0.0.1"
161 .. note:: The value you need to specify will be different for each node in the
164 #. Find the following lines and replace _127.0.0.1_ with the hostname or IP
165 address of any of the machines that will be part of the cluster::
168 seed-nodes = ["akka.tcp://opendaylight-cluster-data@${IP_OF_MEMBER1}:2550",
169 <url-to-cluster-member-2>,
170 <url-to-cluster-member-3>]
172 #. Find the following section and specify the role for each member node. Here
173 we assign the first node with the *member-1* role, the second node with the
174 *member-2* role, and the third node with the *member-3* role::
180 .. note:: This step should use a different role on each node.
182 #. Open the configuration/initial/module-shards.conf file and update the
183 replicas so that each shard is replicated to all three nodes::
191 For reference, view a sample config files <<_sample_config_files,below>>.
193 #. Move into the +<karaf-distribution-directory>/bin+ directory.
194 #. Run the following command::
196 JAVA_MAX_MEM=4G JAVA_MAX_PERM_MEM=512m ./karaf
198 #. Enable clustering by running the following command at the Karaf command line::
200 feature:install odl-mdsal-clustering
202 OpenDaylight should now be running in a three node cluster. You can use any of
203 the three member nodes to access the data residing in the datastore.
208 Sample ``akka.conf`` file::
212 mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
213 mailbox-capacity = 1000
214 mailbox-push-timeout-time = 100ms
217 metric-capture-enabled = true
221 loggers = ["akka.event.slf4j.Slf4jLogger"]
225 provider = "akka.cluster.ClusterActorRefProvider"
227 java = "akka.serialization.JavaSerializer"
228 proto = "akka.remote.serialization.ProtobufSerializer"
231 serialization-bindings {
232 "com.google.protobuf.Message" = proto
237 log-remote-lifecycle-events = off
239 hostname = "10.194.189.96"
241 maximum-frame-size = 419430400
242 send-buffer-size = 52428800
243 receive-buffer-size = 52428800
248 seed-nodes = ["akka.tcp://opendaylight-cluster-data@10.194.189.96:2550",
249 "akka.tcp://opendaylight-cluster-data@10.194.189.98:2550",
250 "akka.tcp://opendaylight-cluster-data@10.194.189.101:2550"]
252 auto-down-unreachable-after = 10s
264 mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
265 mailbox-capacity = 1000
266 mailbox-push-timeout-time = 100ms
269 metric-capture-enabled = true
273 loggers = ["akka.event.slf4j.Slf4jLogger"]
276 provider = "akka.cluster.ClusterActorRefProvider"
280 log-remote-lifecycle-events = off
282 hostname = "10.194.189.96"
288 seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@10.194.189.96:2551"]
290 auto-down-unreachable-after = 10s
295 Sample ``module-shards.conf`` file::
355 OpenDaylight exposes shard information via MBeans, which can be explored with
356 JConsole, VisualVM, or other JMX clients, or exposed via a REST API using
357 `Jolokia <https://jolokia.org/features-nb.html>`_, provided by the
358 ``odl-jolokia`` Karaf feature. This is convenient, due to a significant focus
359 on REST in OpenDaylight.
361 The basic URI that lists a schema of all available MBeans, but not their
366 To read the information about the shards local to the queried OpenDaylight
367 instance use the following REST calls. For the config datastore::
369 GET /jolokia/read/org.opendaylight.controller:type=DistributedConfigDatastore,Category=ShardManager,name=shard-manager-config
371 For the operational datastore::
373 GET /jolokia/read/org.opendaylight.controller:type=DistributedOperationalDatastore,Category=ShardManager,name=shard-manager-operational
375 The output contains information on shards present on the node::
379 "mbean": "org.opendaylight.controller:Category=ShardManager,name=shard-manager-operational,type=DistributedOperationalDatastore",
384 "member-1-shard-default-operational",
385 "member-1-shard-entity-ownership-operational",
386 "member-1-shard-topology-operational",
387 "member-1-shard-inventory-operational",
388 "member-1-shard-toaster-operational"
391 "MemberName": "member-1"
393 "timestamp": 1483738005,
397 The exact names from the "LocalShards" lists are needed for further
398 exploration, as they will be used as part of the URI to look up detailed info
399 on a particular shard. An example output for the
400 ``member-1-shard-default-operational`` looks like this::
404 "mbean": "org.opendaylight.controller:Category=Shards,name=member-1-shard-default-operational,type=DistributedOperationalDatastore",
408 "ReadWriteTransactionCount": 0,
410 "InMemoryJournalLogSize": 1,
411 "ReplicatedToAllIndex": 4,
412 "Leader": "member-1-shard-default-operational",
414 "RaftState": "Leader",
415 "LastCommittedTransactionTime": "2017-01-06 13:19:00.135",
417 "LastLeadershipChangeTime": "2017-01-06 13:18:37.605",
419 "PeerAddresses": "member-3-shard-default-operational: akka.tcp://opendaylight-cluster-data@192.168.16.3:2550/user/shardmanager-operational/member-3-shard-default-operational, member-2-shard-default-operational: akka.tcp://opendaylight-cluster-data@192.168.16.2:2550/user/shardmanager-operational/member-2-shard-default-operational",
420 "WriteOnlyTransactionCount": 0,
421 "FollowerInitialSyncStatus": false,
424 "timeSinceLastActivity": "00:00:00.320",
428 "id": "member-3-shard-default-operational",
432 "timeSinceLastActivity": "00:00:00.320",
436 "id": "member-2-shard-default-operational",
440 "FailedReadTransactionsCount": 0,
441 "StatRetrievalTime": "810.5 μs",
445 "FailedTransactionsCount": 0,
446 "PendingTxCommitQueueSize": 0,
447 "VotedFor": "member-1-shard-default-operational",
448 "SnapshotCaptureInitiated": false,
449 "CommittedTransactionsCount": 6,
450 "TxCohortCacheSize": 0,
451 "PeerVotingStates": "member-3-shard-default-operational: true, member-2-shard-default-operational: true",
453 "StatRetrievalError": null,
456 "AbortTransactionsCount": 0,
457 "ReadOnlyTransactionCount": 0,
458 "ShardName": "member-1-shard-default-operational",
459 "LeadershipChangeCount": 1,
460 "InMemoryJournalDataSize": 450
462 "timestamp": 1483740350,
466 The output helps identifying shard state (leader/follower, voting/non-voting),
467 peers, follower details if the shard is a leader, and other
470 The Integration team is maintaining a Python based `tool
471 <https://github.com/opendaylight/integration-test/tree/master/tools/clustering/cluster-monitor>`_,
472 that takes advantage of the above MBeans exposed via Jolokia.
474 .. _cluster_admin_api:
476 Geo-distributed Active/Backup Setup
477 -----------------------------------
479 An OpenDaylight cluster works best when the latency between the nodes is very
480 small, which practically means they should be in the same datacenter. It is
481 however desirable to have the possibility to fail over to a different
482 datacenter, in case all nodes become unreachable. To achieve that, the cluster
483 can be expanded with nodes in a different datacenter, but in a way that
484 doesn't affect latency of the primary nodes. To do that, shards in the backup
485 nodes must be in "non-voting" state.
487 The API to manipulate voting states on shards is defined as RPCs in the
488 `cluster-admin.yang <https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blob;f=opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang>`_
489 file in the *controller* project, which is well documented. A summary is
494 Unless otherwise indicated, the below POST requests are to be sent to any
497 To create an active/backup setup with a 6 node cluster (3 active and 3 backup
498 nodes in two locations) there is an RPC to set voting states of all shards on
499 a list of nodes to a given state::
501 POST /restconf/operations/cluster-admin:change-member-voting-states-for-all-shards
503 This RPC needs the list of nodes and the desired voting state as input. For
504 creating the backup nodes, this example input can be used::
508 "member-voting-state": [
510 "member-name": "member-4",
514 "member-name": "member-5",
518 "member-name": "member-6",
525 When an active/backup deployment already exists, with shards on the backup
526 nodes in non-voting state, all that is needed for a fail-over from the active
527 "sub-cluster" to backup "sub-cluster" is to flip the voting state of each
528 shard (on each node, active AND backup). That can be easily achieved with the
529 following RPC call (no parameters needed)::
531 POST /restconf/operations/cluster-admin:flip-member-voting-states-for-all-shards
533 If it's an unplanned outage where the primary voting nodes are down, the
534 "flip" RPC must be sent to a backup non-voting node. In this case there are no
535 shard leaders to carry out the voting changes. However there is a special case
536 whereby if the node that receives the RPC is non-voting and is to be changed
537 to voting and there's no leader, it will apply the voting changes locally and
538 attempt to become the leader. If successful, it persists the voting changes
539 and replicates them to the remaining nodes.
541 When the primary site is fixed and you want to fail back to it, care must be
542 taken when bringing the site back up. Because it was down when the voting
543 states were flipped on the secondary, its persisted database won't contain
544 those changes. If brought back up in that state, the nodes will think they're
545 still voting. If the nodes have connectivity to the secondary site, they
546 should follow the leader in the secondary site and sync with it. However if
547 this does not happen then the primary site may elect its own leader thereby
548 partitioning the 2 clusters, which can lead to undesirable results. Therefore
549 it is recommended to either clean the databases (i.e., ``journal`` and
550 ``snapshots`` directory) on the primary nodes before bringing them back up or
551 restore them from a recent backup of the secondary site (see section
552 :ref:`cluster_backup_restore`).
554 If is also possible to gracefully remove a node from a cluster, with the
557 POST /restconf/operations/cluster-admin:remove-all-shard-replicas
563 "member-name": "member-1"
567 or just one particular shard::
569 POST /restconf/operations/cluster-admin:remove-shard-replica
575 "shard-name": "default",
576 "member-name": "member-2",
577 "data-store-type": "config"
581 Now that a (potentially dead/unrecoverable) node was removed, another one can
582 be added at runtime, without changing the configuration files of the healthy
583 nodes (requiring reboot)::
585 POST /restconf/operations/cluster-admin:add-replicas-for-all-shards
587 No input required, but this RPC needs to be sent to the new node, to instruct
588 it to replicate all shards from the cluster.
592 While the cluster admin API allows adding and removing shards dynamically,
593 the ``module-shard.conf`` and ``modules.conf`` files are still used on
594 startup to define the initial configuration of shards. Modifications from
595 the use of the API are not stored to those static files, but to the journal.
597 Extra Configuration Options
598 ---------------------------
600 ============================================== ================= ======= ==============================================================================================================================================================================
601 Name Type Default Description
602 ============================================== ================= ======= ==============================================================================================================================================================================
603 max-shard-data-change-executor-queue-size uint32 (1..max) 1000 The maximum queue size for each shard's data store data change notification executor.
604 max-shard-data-change-executor-pool-size uint32 (1..max) 20 The maximum thread pool size for each shard's data store data change notification executor.
605 max-shard-data-change-listener-queue-size uint32 (1..max) 1000 The maximum queue size for each shard's data store data change listener.
606 max-shard-data-store-executor-queue-size uint32 (1..max) 5000 The maximum queue size for each shard's data store executor.
607 shard-transaction-idle-timeout-in-minutes uint32 (1..max) 10 The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.
608 shard-snapshot-batch-count uint32 (1..max) 20000 The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
609 shard-snapshot-data-threshold-percentage uint8 (1..100) 12 The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken
610 shard-hearbeat-interval-in-millis uint16 (100..max) 500 The interval at which a shard will send a heart beat message to its remote shard.
611 operation-timeout-in-seconds uint16 (5..max) 5 The maximum amount of time for akka operations (remote or local) to complete before failing.
612 shard-journal-recovery-log-batch-size uint32 (1..max) 5000 The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.
613 shard-transaction-commit-timeout-in-seconds uint32 (1..max) 30 The maximum amount of time a shard transaction three-phase commit can be idle without receiving the next messages before it aborts the transaction
614 shard-transaction-commit-queue-capacity uint32 (1..max) 20000 The maximum allowed capacity for each shard's transaction commit queue.
615 shard-initialization-timeout-in-seconds uint32 (1..max) 300 The maximum amount of time to wait for a shard to initialize from persistence on startup before failing an operation (eg transaction create and change listener registration).
616 shard-leader-election-timeout-in-seconds uint32 (1..max) 30 The maximum amount of time to wait for a shard to elect a leader before failing an operation (eg transaction create).
617 enable-metric-capture boolean false Enable or disable metric capture.
618 bounded-mailbox-capacity uint32 (1..max) 1000 Max queue size that an actor's mailbox can reach
619 persistent boolean true Enable or disable data persistence
620 shard-isolated-leader-check-interval-in-millis uint32 (1..max) 5000 the interval at which the leader of the shard will check if its majority followers are active and term itself as isolated
621 ============================================== ================= ======= ==============================================================================================================================================================================
623 These configuration options are included in the etc/org.opendaylight.controller.cluster.datastore.cfg configuration file.