7 Clustering is a mechanism that enables multiple processes and programs to work
8 together as one entity. For example, when you search for something on
9 google.com, it may seem like your search request is processed by only one web
10 server. In reality, your search request is processed by may web servers
11 connected in a cluster. Similarly, you can have multiple instances of
12 OpenDaylight working together as one entity.
14 Advantages of clustering are:
16 * Scaling: If you have multiple instances of OpenDaylight running, you can
17 potentially do more work and store more data than you could with only one
18 instance. You can also break up your data into smaller chunks (shards) and
19 either distribute that data across the cluster or perform certain operations
20 on certain members of the cluster.
21 * High Availability: If you have multiple instances of OpenDaylight running and
22 one of them crashes, you will still have the other instances working and
24 * Data Persistence: You will not lose any data stored in OpenDaylight after a
25 manual restart or a crash.
27 The following sections describe how to set up clustering on both individual and
28 multiple OpenDaylight instances.
30 Multiple Node Clustering
31 ------------------------
33 The following sections describe how to set up multiple node clusters in OpenDaylight.
35 Deployment Considerations
36 ^^^^^^^^^^^^^^^^^^^^^^^^^
38 To implement clustering, the deployment considerations are as follows:
40 * To set up a cluster with multiple nodes, we recommend that you use a minimum
41 of three machines. You can set up a cluster with just two nodes. However, if
42 one of the two nodes fail, the cluster will not be operational.
44 .. note:: This is because clustering in OpenDaylight requires a majority of the
45 nodes to be up and one node cannot be a majority of two nodes.
47 * Every device that belongs to a cluster needs to have an identifier.
48 OpenDaylight uses the node's ``role`` for this purpose. After you define the
49 first node's role as *member-1* in the ``akka.conf`` file, OpenDaylight uses
50 *member-1* to identify that node.
52 * Data shards are used to contain all or a certain segment of a OpenDaylight's
53 MD-SAL datastore. For example, one shard can contain all the inventory data
54 while another shard contains all of the topology data.
56 If you do not specify a module in the ``modules.conf`` file and do not specify
57 a shard in ``module-shards.conf``, then (by default) all the data is placed in
58 the default shard (which must also be defined in ``module-shards.conf`` file).
59 Each shard has replicas configured. You can specify the details of where the
60 replicas reside in ``module-shards.conf`` file.
62 * If you have a three node cluster and would like to be able to tolerate any
63 single node crashing, a replica of every defined data shard must be running
64 on all three cluster nodes.
66 .. note:: This is because OpenDaylight's clustering implementation requires a
67 majority of the defined shard replicas to be running in order to
68 function. If you define data shard replicas on two of the cluster nodes
69 and one of those nodes goes down, the corresponding data shards will not
72 * If you have a three node cluster and have defined replicas for a data shard
73 on each of those nodes, that shard will still function even if only two of
74 the cluster nodes are running. Note that if one of those remaining two nodes
75 goes down, the shard will not be operational.
77 * It is recommended that you have multiple seed nodes configured. After a
78 cluster member is started, it sends a message to all of its seed nodes.
79 The cluster member then sends a join command to the first seed node that
80 responds. If none of its seed nodes reply, the cluster member repeats this
81 process until it successfully establishes a connection or it is shut down.
83 * After a node is unreachable, it remains down for configurable period of time
84 (10 seconds, by default). Once a node goes down, you need to restart it so
85 that it can rejoin the cluster. Once a restarted node joins a cluster, it
86 will synchronize with the lead node automatically.
88 .. _getting-started-clustering-scripts:
93 OpenDaylight includes some scripts to help with the clustering configuration.
97 Scripts are stored in the OpenDaylight distribution/bin folder, and
98 maintained in the distribution project
99 `repository <https://git.opendaylight.org/gerrit/p/integration/distribution>`_
100 in the folder distribution-karaf/src/main/assembly/bin/.
102 Configure Cluster Script
103 ^^^^^^^^^^^^^^^^^^^^^^^^
105 This script is used to configure the cluster parameters (e.g. akka.conf,
106 module-shards.conf) on a member of the controller cluster. The user should
107 restart the node to apply the changes.
111 The script can be used at any time, even before the controller is started
116 bin/configure_cluster.sh <index> <seed_nodes_list>
118 * index: Integer within 1..N, where N is the number of seed nodes. This indicates
119 which controller node (1..N) is configured by the script.
120 * seed_nodes_list: List of seed nodes (IP address), separated by comma or space.
122 The IP address at the provided index should belong to the member executing
123 the script. When running this script on multiple seed nodes, keep the
124 seed_node_list the same, and vary the index from 1 through N.
126 Optionally, shards can be configured in a more granular way by modifying the
127 file "custom_shard_configs.txt" in the same folder as this tool. Please see
128 that file for more details.
132 bin/configure_cluster.sh 2 192.168.0.1 192.168.0.2 192.168.0.3
134 The above command will configure the member 2 (IP address 192.168.0.2) of a
135 cluster made of 192.168.0.1 192.168.0.2 192.168.0.3.
137 Setting Up a Multiple Node Cluster
138 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
140 To run OpenDaylight in a three node cluster, perform the following:
142 First, determine the three machines that will make up the cluster. After that,
143 do the following on each machine:
145 #. Copy the OpenDaylight distribution zip file to the machine.
146 #. Unzip the distribution.
147 #. Open the following .conf files:
149 * configuration/initial/akka.conf
150 * configuration/initial/module-shards.conf
152 #. In each configuration file, make the following changes:
154 Find every instance of the following lines and replace _127.0.0.1_ with the
155 hostname or IP address of the machine on which this file resides and
156 OpenDaylight will run::
159 hostname = "127.0.0.1"
161 .. note:: The value you need to specify will be different for each node in the
164 #. Find the following lines and replace _127.0.0.1_ with the hostname or IP
165 address of any of the machines that will be part of the cluster::
168 seed-nodes = ["akka.tcp://opendaylight-cluster-data@${IP_OF_MEMBER1}:2550",
169 <url-to-cluster-member-2>,
170 <url-to-cluster-member-3>]
172 #. Find the following section and specify the role for each member node. Here
173 we assign the first node with the *member-1* role, the second node with the
174 *member-2* role, and the third node with the *member-3* role::
180 .. note:: This step should use a different role on each node.
182 #. Open the configuration/initial/module-shards.conf file and update the
183 replicas so that each shard is replicated to all three nodes::
191 For reference, view a sample config files <<_sample_config_files,below>>.
193 #. Move into the +<karaf-distribution-directory>/bin+ directory.
194 #. Run the following command::
196 JAVA_MAX_MEM=4G JAVA_MAX_PERM_MEM=512m ./karaf
198 #. Enable clustering by running the following command at the Karaf command line::
200 feature:install odl-mdsal-clustering
202 OpenDaylight should now be running in a three node cluster. You can use any of
203 the three member nodes to access the data residing in the datastore.
208 Sample ``akka.conf`` file::
212 mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
213 mailbox-capacity = 1000
214 mailbox-push-timeout-time = 100ms
217 metric-capture-enabled = true
221 loggers = ["akka.event.slf4j.Slf4jLogger"]
225 provider = "akka.cluster.ClusterActorRefProvider"
227 java = "akka.serialization.JavaSerializer"
228 proto = "akka.remote.serialization.ProtobufSerializer"
231 serialization-bindings {
232 "com.google.protobuf.Message" = proto
237 log-remote-lifecycle-events = off
239 hostname = "10.194.189.96"
241 maximum-frame-size = 419430400
242 send-buffer-size = 52428800
243 receive-buffer-size = 52428800
248 seed-nodes = ["akka.tcp://opendaylight-cluster-data@10.194.189.96:2550",
249 "akka.tcp://opendaylight-cluster-data@10.194.189.98:2550",
250 "akka.tcp://opendaylight-cluster-data@10.194.189.101:2550"]
252 auto-down-unreachable-after = 10s
264 mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
265 mailbox-capacity = 1000
266 mailbox-push-timeout-time = 100ms
269 metric-capture-enabled = true
273 loggers = ["akka.event.slf4j.Slf4jLogger"]
276 provider = "akka.cluster.ClusterActorRefProvider"
280 log-remote-lifecycle-events = off
282 hostname = "10.194.189.96"
288 seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@10.194.189.96:2551"]
290 auto-down-unreachable-after = 10s
295 Sample ``module-shards.conf`` file::
355 OpenDaylight exposes shard information via MBeans, which can be explored with
356 JConsole, VisualVM, or other JMX clients, or exposed via a REST API using
357 `Jolokia <https://jolokia.org/features-nb.html>`_, provided by the
358 ``odl-jolokia`` Karaf feature. This is convenient, due to a significant focus
359 on REST in OpenDaylight.
361 The basic URI that lists a schema of all available MBeans, but not their
366 To read the information about the shards local to the queried OpenDaylight
367 instance use the following REST calls. For the config datastore::
369 GET /jolokia/read/org.opendaylight.controller:type=DistributedConfigDatastore,Category=ShardManager,name=shard-manager-config
371 For the operational datastore::
373 GET /jolokia/read/org.opendaylight.controller:type=DistributedOperationalDatastore,Category=ShardManager,name=shard-manager-operational
375 The output contains information on shards present on the node::
379 "mbean": "org.opendaylight.controller:Category=ShardManager,name=shard-manager-operational,type=DistributedOperationalDatastore",
384 "member-1-shard-default-operational",
385 "member-1-shard-entity-ownership-operational",
386 "member-1-shard-topology-operational",
387 "member-1-shard-inventory-operational",
388 "member-1-shard-toaster-operational"
391 "MemberName": "member-1"
393 "timestamp": 1483738005,
397 The exact names from the "LocalShards" lists are needed for further
398 exploration, as they will be used as part of the URI to look up detailed info
399 on a particular shard. An example output for the
400 ``member-1-shard-default-operational`` looks like this::
404 "mbean": "org.opendaylight.controller:Category=Shards,name=member-1-shard-default-operational,type=DistributedOperationalDatastore",
408 "ReadWriteTransactionCount": 0,
410 "InMemoryJournalLogSize": 1,
411 "ReplicatedToAllIndex": 4,
412 "Leader": "member-1-shard-default-operational",
414 "RaftState": "Leader",
415 "LastCommittedTransactionTime": "2017-01-06 13:19:00.135",
417 "LastLeadershipChangeTime": "2017-01-06 13:18:37.605",
419 "PeerAddresses": "member-3-shard-default-operational: akka.tcp://opendaylight-cluster-data@192.168.16.3:2550/user/shardmanager-operational/member-3-shard-default-operational, member-2-shard-default-operational: akka.tcp://opendaylight-cluster-data@192.168.16.2:2550/user/shardmanager-operational/member-2-shard-default-operational",
420 "WriteOnlyTransactionCount": 0,
421 "FollowerInitialSyncStatus": false,
424 "timeSinceLastActivity": "00:00:00.320",
428 "id": "member-3-shard-default-operational",
432 "timeSinceLastActivity": "00:00:00.320",
436 "id": "member-2-shard-default-operational",
440 "FailedReadTransactionsCount": 0,
441 "StatRetrievalTime": "810.5 μs",
445 "FailedTransactionsCount": 0,
446 "PendingTxCommitQueueSize": 0,
447 "VotedFor": "member-1-shard-default-operational",
448 "SnapshotCaptureInitiated": false,
449 "CommittedTransactionsCount": 6,
450 "TxCohortCacheSize": 0,
451 "PeerVotingStates": "member-3-shard-default-operational: true, member-2-shard-default-operational: true",
453 "StatRetrievalError": null,
456 "AbortTransactionsCount": 0,
457 "ReadOnlyTransactionCount": 0,
458 "ShardName": "member-1-shard-default-operational",
459 "LeadershipChangeCount": 1,
460 "InMemoryJournalDataSize": 450
462 "timestamp": 1483740350,
466 The output helps identifying shard state (leader/follower, voting/non-voting),
467 peers, follower details if the shard is a leader, and other
470 The Integration team is maintaining a Python based `tool
471 <https://github.com/opendaylight/integration-test/tree/master/tools/clustering/cluster-monitor>`_,
472 that takes advantage of the above MBeans exposed via Jolokia, and the
473 *systemmetrics* project offers a DLUX based UI to display the same
476 .. _cluster_admin_api:
478 Geo-distributed Active/Backup Setup
479 -----------------------------------
481 An OpenDaylight cluster works best when the latency between the nodes is very
482 small, which practically means they should be in the same datacenter. It is
483 however desirable to have the possibility to fail over to a different
484 datacenter, in case all nodes become unreachable. To achieve that, the cluster
485 can be expanded with nodes in a different datacenter, but in a way that
486 doesn't affect latency of the primary nodes. To do that, shards in the backup
487 nodes must be in "non-voting" state.
489 The API to manipulate voting states on shards is defined as RPCs in the
490 `cluster-admin.yang <https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blob;f=opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang>`_
491 file in the *controller* project, which is well documented. A summary is
496 Unless otherwise indicated, the below POST requests are to be sent to any
499 To create an active/backup setup with a 6 node cluster (3 active and 3 backup
500 nodes in two locations) there is an RPC to set voting states of all shards on
501 a list of nodes to a given state::
503 POST /restconf/operations/cluster-admin:change-member-voting-states-for-all-shards
505 This RPC needs the list of nodes and the desired voting state as input. For
506 creating the backup nodes, this example input can be used::
510 "member-voting-state": [
512 "member-name": "member-4",
516 "member-name": "member-5",
520 "member-name": "member-6",
527 When an active/backup deployment already exists, with shards on the backup
528 nodes in non-voting state, all that is needed for a fail-over from the active
529 "sub-cluster" to backup "sub-cluster" is to flip the voting state of each
530 shard (on each node, active AND backup). That can be easily achieved with the
531 following RPC call (no parameters needed)::
533 POST /restconf/operations/cluster-admin:flip-member-voting-states-for-all-shards
535 If it's an unplanned outage where the primary voting nodes are down, the
536 "flip" RPC must be sent to a backup non-voting node. In this case there are no
537 shard leaders to carry out the voting changes. However there is a special case
538 whereby if the node that receives the RPC is non-voting and is to be changed
539 to voting and there's no leader, it will apply the voting changes locally and
540 attempt to become the leader. If successful, it persists the voting changes
541 and replicates them to the remaining nodes.
543 When the primary site is fixed and you want to fail back to it, care must be
544 taken when bringing the site back up. Because it was down when the voting
545 states were flipped on the secondary, its persisted database won't contain
546 those changes. If brought back up in that state, the nodes will think they're
547 still voting. If the nodes have connectivity to the secondary site, they
548 should follow the leader in the secondary site and sync with it. However if
549 this does not happen then the primary site may elect its own leader thereby
550 partitioning the 2 clusters, which can lead to undesirable results. Therefore
551 it is recommended to either clean the databases (i.e., ``journal`` and
552 ``snapshots`` directory) on the primary nodes before bringing them back up or
553 restore them from a recent backup of the secondary site (see section
554 :ref:`cluster_backup_restore`).
556 If is also possible to gracefully remove a node from a cluster, with the
559 POST /restconf/operations/cluster-admin:remove-all-shard-replicas
565 "member-name": "member-1"
569 or just one particular shard::
571 POST /restconf/operations/cluster-admin:remove-shard-replica
577 "shard-name": "default",
578 "member-name": "member-2",
579 "data-store-type": "config"
583 Now that a (potentially dead/unrecoverable) node was removed, another one can
584 be added at runtime, without changing the configuration files of the healthy
585 nodes (requiring reboot)::
587 POST /restconf/operations/cluster-admin:add-replicas-for-all-shards
589 No input required, but this RPC needs to be sent to the new node, to instruct
590 it to replicate all shards from the cluster.
594 While the cluster admin API allows adding and removing shards dynamically,
595 the ``module-shard.conf`` and ``modules.conf`` files are still used on
596 startup to define the initial configuration of shards. Modifications from
597 the use of the API are not stored to those static files, but to the journal.
599 Extra Configuration Options
600 ---------------------------
602 ============================================== ================= ======= ==============================================================================================================================================================================
603 Name Type Default Description
604 ============================================== ================= ======= ==============================================================================================================================================================================
605 max-shard-data-change-executor-queue-size uint32 (1..max) 1000 The maximum queue size for each shard's data store data change notification executor.
606 max-shard-data-change-executor-pool-size uint32 (1..max) 20 The maximum thread pool size for each shard's data store data change notification executor.
607 max-shard-data-change-listener-queue-size uint32 (1..max) 1000 The maximum queue size for each shard's data store data change listener.
608 max-shard-data-store-executor-queue-size uint32 (1..max) 5000 The maximum queue size for each shard's data store executor.
609 shard-transaction-idle-timeout-in-minutes uint32 (1..max) 10 The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.
610 shard-snapshot-batch-count uint32 (1..max) 20000 The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.
611 shard-snapshot-data-threshold-percentage uint8 (1..100) 12 The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken
612 shard-hearbeat-interval-in-millis uint16 (100..max) 500 The interval at which a shard will send a heart beat message to its remote shard.
613 operation-timeout-in-seconds uint16 (5..max) 5 The maximum amount of time for akka operations (remote or local) to complete before failing.
614 shard-journal-recovery-log-batch-size uint32 (1..max) 5000 The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.
615 shard-transaction-commit-timeout-in-seconds uint32 (1..max) 30 The maximum amount of time a shard transaction three-phase commit can be idle without receiving the next messages before it aborts the transaction
616 shard-transaction-commit-queue-capacity uint32 (1..max) 20000 The maximum allowed capacity for each shard's transaction commit queue.
617 shard-initialization-timeout-in-seconds uint32 (1..max) 300 The maximum amount of time to wait for a shard to initialize from persistence on startup before failing an operation (eg transaction create and change listener registration).
618 shard-leader-election-timeout-in-seconds uint32 (1..max) 30 The maximum amount of time to wait for a shard to elect a leader before failing an operation (eg transaction create).
619 enable-metric-capture boolean false Enable or disable metric capture.
620 bounded-mailbox-capacity uint32 (1..max) 1000 Max queue size that an actor's mailbox can reach
621 persistent boolean true Enable or disable data persistence
622 shard-isolated-leader-check-interval-in-millis uint32 (1..max) 5000 the interval at which the leader of the shard will check if its majority followers are active and term itself as isolated
623 ============================================== ================= ======= ==============================================================================================================================================================================
625 These configuration options are included in the etc/org.opendaylight.controller.cluster.datastore.cfg configuration file.