3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.clustering.services_implementation.internal;
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.util.ArrayList;
17 import java.util.EnumSet;
18 import java.util.Enumeration;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Properties;
23 import java.util.StringTokenizer;
24 import java.util.concurrent.ConcurrentMap;
26 import javax.transaction.HeuristicMixedException;
27 import javax.transaction.HeuristicRollbackException;
28 import javax.transaction.NotSupportedException;
29 import javax.transaction.RollbackException;
30 import javax.transaction.SystemException;
31 import javax.transaction.Transaction;
32 import javax.transaction.TransactionManager;
34 import org.infinispan.Cache;
35 import org.infinispan.configuration.cache.Configuration;
36 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
37 import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
38 import org.infinispan.configuration.parsing.ParserRegistry;
39 import org.infinispan.manager.DefaultCacheManager;
40 import org.infinispan.manager.EmbeddedCacheManager;
41 import org.infinispan.notifications.Listener;
42 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
43 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
44 import org.infinispan.remoting.transport.Address;
45 import org.infinispan.remoting.transport.Transport;
46 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
47 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
48 import org.jgroups.Channel;
49 import org.jgroups.Event;
50 import org.jgroups.stack.GossipRouter;
51 import org.opendaylight.controller.clustering.services.CacheConfigException;
52 import org.opendaylight.controller.clustering.services.CacheExistException;
53 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
54 import org.opendaylight.controller.clustering.services.IClusterServices;
55 import org.opendaylight.controller.clustering.services.IGetUpdates;
56 import org.opendaylight.controller.clustering.services.IListenRoleChange;
57 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 public class ClusterManager implements IClusterServices {
62 protected static final Logger logger = LoggerFactory
63 .getLogger(ClusterManager.class);
64 private DefaultCacheManager cm;
65 GossipRouter gossiper;
66 private HashSet<IListenRoleChange> roleChangeListeners;
67 private ViewChangedListener cacheManagerListener;
69 private static String loopbackAddress = "127.0.0.1";
72 * Start a JGroups GossipRouter if we are a supernode. The
73 * GosispRouter is nothing more than a simple
74 * rendevouz-pointer. All the nodes that wants to join the cluster
75 * will come to any of the rendevouz point and they introduce the
76 * nodes to all the others. Once the meet and greet phase if over,
77 * the nodes will open a full-mesh with the remaining n-1 nodes,
78 * so even if the GossipRouter goes down nothing is lost.
79 * NOTE: This function has the side effect to set some of the
80 * JGROUPS configurations, this because in this function already
81 * we try to retrieve some of the network capabilities of the
82 * device and so it's better not to do that again
85 * @return GossipRouter
87 private GossipRouter startGossiper() {
88 boolean amIGossipRouter = false;
89 Integer gossipRouterPortDefault = 12001;
90 Integer gossipRouterPort = gossipRouterPortDefault;
91 InetAddress gossipRouterAddress = null;
92 String supernodes_list = System.getProperty("supernodes",
94 StringBuffer sanitized_supernodes_list = new StringBuffer();
95 List<InetAddress> myAddresses = new ArrayList<InetAddress>();
97 StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
98 if (supernodes.hasMoreTokens()) {
99 // Populate the list of my addresses
101 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
102 while (e.hasMoreElements()) {
103 NetworkInterface n = e.nextElement();
104 Enumeration<InetAddress> ee = n.getInetAddresses();
105 while (ee.hasMoreElements()) {
106 InetAddress i = ee.nextElement();
110 } catch (SocketException se) {
111 logger.error("Cannot get the list of network interfaces");
115 while (supernodes.hasMoreTokens()) {
116 String curr_supernode = supernodes.nextToken();
117 logger.debug("Examining supernode {}", curr_supernode);
118 StringTokenizer host_port = new StringTokenizer(curr_supernode,
122 Integer port_num = gossipRouterPortDefault;
123 if (host_port.countTokens() > 2) {
124 logger.error("Error parsing supernode {} proceed to the next one",
128 host = host_port.nextToken();
129 InetAddress hostAddr;
131 hostAddr = InetAddress.getByName(host);
132 } catch (UnknownHostException ue) {
133 logger.error("Host not known");
136 if (host_port.hasMoreTokens()) {
137 port = host_port.nextToken();
139 port_num = Integer.valueOf(port);
140 } catch (NumberFormatException ne) {
142 .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
143 port_num = gossipRouterPortDefault;
145 if ((port_num > 65535) || (port_num < 0)) {
147 .error("Supplied supernode gossip port is outside a valid TCP port range");
148 port_num = gossipRouterPortDefault;
151 if (!amIGossipRouter) {
153 for (InetAddress myAddr : myAddresses) {
154 if (myAddr.equals(hostAddr)) {
155 amIGossipRouter = true;
156 gossipRouterAddress = hostAddr;
157 gossipRouterPort = port_num;
163 if (!sanitized_supernodes_list.toString().equals("")) {
164 sanitized_supernodes_list.append(",");
166 sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
170 if (amIGossipRouter) {
171 // Set the Jgroups binding interface to the one we got
172 // from the supernodes attribute
173 if (gossipRouterAddress != null) {
174 System.setProperty("jgroups.tcp.address", gossipRouterAddress
178 // Set the Jgroup binding interface to the one we are well
179 // known outside or else to the first with non-local
182 String myBind = InetAddress.getLocalHost().getHostAddress();
184 || InetAddress.getLocalHost().isLoopbackAddress()) {
185 for (InetAddress myAddr : myAddresses) {
186 if (myAddr.isLoopbackAddress()
187 || myAddr.isLinkLocalAddress()) {
188 logger.debug("Skipping local address {}",
189 myAddr.getHostAddress());
192 // First non-local address
193 myBind = myAddr.getHostAddress();
194 logger.debug("First non-local address {}", myBind);
199 String jgroupAddress = System
200 .getProperty("jgroups.tcp.address");
201 if (jgroupAddress == null) {
202 if (myBind != null) {
203 logger.debug("Set bind address to be {}", myBind);
204 System.setProperty("jgroups.tcp.address", myBind);
207 .debug("Set bind address to be LOCALHOST=127.0.0.1");
208 System.setProperty("jgroups.tcp.address", "127.0.0.1");
211 logger.debug("jgroup.tcp.address already set to be {}",
214 } catch (UnknownHostException uhe) {
216 .error("Met UnknownHostException while trying to get binding address for jgroups");
220 // The supernodes list constitute also the tcpgossip initial
222 System.setProperty("jgroups.tcpgossip.initial_hosts",
223 sanitized_supernodes_list.toString());
224 logger.debug("jgroups.tcp.address set to {}",
225 System.getProperty("jgroups.tcp.address"));
226 logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
227 System.getProperty("jgroups.tcpgossip.initial_hosts"));
228 GossipRouter res = null;
229 if (amIGossipRouter) {
230 logger.info("I'm a GossipRouter will listen on port {}",
232 // Start a GossipRouter with JMX support
233 res = new GossipRouter(gossipRouterPort, null, true);
238 public void start() {
239 this.gossiper = startGossiper();
240 if (this.gossiper != null) {
241 logger.debug("Trying to start Gossiper");
243 this.gossiper.start();
244 logger.info("Started GossipRouter");
245 } catch (Exception e) {
246 logger.error("GossipRouter didn't start. Exception Stack Trace",
250 logger.info("Starting the ClusterManager");
252 ParserRegistry parser = new ParserRegistry(this.getClass()
254 String infinispanConfigFile =
255 System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
256 logger.debug("Using configuration file:{}", infinispanConfigFile);
257 ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
258 GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
259 globalBuilder.serialization()
260 .classResolver(new ClassResolver())
262 this.cm = new DefaultCacheManager(holder, false);
263 logger.debug("Allocated ClusterManager");
264 if (this.cm != null) {
266 this.cm.startCache();
267 logger.debug("Started the ClusterManager");
269 } catch (Exception ioe) {
270 logger.error("Cannot configure infinispan .. bailing out ");
271 logger.error("Stack Trace that raised th exception");
272 logger.error("",ioe);
276 logger.debug("Cache Manager has value {}", this.cm);
280 logger.info("Stopping the ClusterManager");
281 if (this.cm != null) {
282 logger.info("Found a valid ClusterManager, now let it be stopped");
286 if (this.gossiper != null) {
287 this.gossiper.stop();
288 this.gossiper = null;
293 public ConcurrentMap<?, ?> createCache(String containerName,
294 String cacheName, Set<cacheMode> cMode) throws CacheExistException,
295 CacheConfigException {
296 EmbeddedCacheManager manager = this.cm;
297 Cache<Object,Object> c;
298 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
299 if (manager == null) {
303 if (manager.cacheExists(realCacheName)) {
304 throw new CacheExistException();
307 // Sanity check to avoid contrasting parameters
308 if (cMode.containsAll(EnumSet.of(
309 IClusterServices.cacheMode.NON_TRANSACTIONAL,
310 IClusterServices.cacheMode.TRANSACTIONAL))) {
311 throw new CacheConfigException();
314 if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
315 c = manager.getCache(realCacheName);
317 } else if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
318 Configuration rc = manager
319 .getCacheConfiguration("transactional-type");
320 manager.defineConfiguration(realCacheName, rc);
321 c = manager.getCache(realCacheName);
328 public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
329 EmbeddedCacheManager manager = this.cm;
330 Cache<Object,Object> c;
331 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
332 if (manager == null) {
336 if (manager.cacheExists(realCacheName)) {
337 c = manager.getCache(realCacheName);
344 public void destroyCache(String containerName, String cacheName) {
345 EmbeddedCacheManager manager = this.cm;
346 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
347 if (manager == null) {
350 if (manager.cacheExists(realCacheName)) {
351 manager.removeCache(realCacheName);
356 public boolean existCache(String containerName, String cacheName) {
357 EmbeddedCacheManager manager = this.cm;
358 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
359 if (manager == null) {
362 return manager.cacheExists(realCacheName);
366 public Set<String> getCacheList(String containerName) {
367 Set<String> perContainerCaches = new HashSet<String>();
368 EmbeddedCacheManager manager = this.cm;
369 if (manager == null) {
372 for (String cacheName : manager.getCacheNames()) {
373 if (!manager.isRunning(cacheName)) continue;
374 if (cacheName.startsWith("{" + containerName + "}_")) {
375 String[] res = cacheName.split("[{}]");
376 if (res.length >= 4 && res[1].equals(containerName)
377 && res[2].equals("_")) {
378 perContainerCaches.add(res[3]);
383 return (perContainerCaches);
387 public Properties getCacheProperties(String containerName, String cacheName) {
388 EmbeddedCacheManager manager = this.cm;
389 if (manager == null) {
392 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
393 if (!manager.cacheExists(realCacheName)) {
396 Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
397 .getCacheConfiguration();
398 Properties p = new Properties();
399 p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
400 conf.transaction().toString());
401 p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
402 conf.clustering().toString());
403 p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
404 .locking().toString());
409 public void addListener(String containerName, String cacheName,
410 IGetUpdates<?, ?> u) throws CacheListenerAddException {
411 EmbeddedCacheManager manager = this.cm;
412 Cache<Object,Object> c;
413 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
414 if (manager == null) {
418 if (!manager.cacheExists(realCacheName)) {
419 throw new CacheListenerAddException();
421 c = manager.getCache(realCacheName);
422 CacheListenerContainer cl = new CacheListenerContainer(u,
423 containerName, cacheName);
428 public Set<IGetUpdates<?, ?>> getListeners(String containerName,
430 EmbeddedCacheManager manager = this.cm;
431 Cache<Object,Object> c;
432 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
433 if (manager == null) {
437 if (!manager.cacheExists(realCacheName)) {
440 c = manager.getCache(realCacheName);
442 Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
443 Set<Object> listeners = c.getListeners();
444 for (Object listener : listeners) {
445 if (listener instanceof CacheListenerContainer) {
446 CacheListenerContainer cl = (CacheListenerContainer) listener;
447 res.add(cl.whichListener());
455 public void removeListener(String containerName, String cacheName,
456 IGetUpdates<?, ?> u) {
457 EmbeddedCacheManager manager = this.cm;
458 Cache<Object,Object> c;
459 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
460 if (manager == null) {
464 if (!manager.cacheExists(realCacheName)) {
467 c = manager.getCache(realCacheName);
469 Set<Object> listeners = c.getListeners();
470 for (Object listener : listeners) {
471 if (listener instanceof CacheListenerContainer) {
472 CacheListenerContainer cl = (CacheListenerContainer) listener;
473 if (cl.whichListener() == u) {
474 c.removeListener(listener);
482 public void tbegin() throws NotSupportedException, SystemException {
483 EmbeddedCacheManager manager = this.cm;
484 if (manager == null) {
485 throw new IllegalStateException();
487 TransactionManager tm = manager.getCache("transactional-type")
488 .getAdvancedCache().getTransactionManager();
490 throw new IllegalStateException();
496 public void tcommit() throws RollbackException, HeuristicMixedException,
497 HeuristicRollbackException, java.lang.SecurityException,
498 java.lang.IllegalStateException, SystemException {
499 EmbeddedCacheManager manager = this.cm;
500 if (manager == null) {
501 throw new IllegalStateException();
503 TransactionManager tm = manager.getCache("transactional-type")
504 .getAdvancedCache().getTransactionManager();
506 throw new IllegalStateException();
512 public void trollback() throws java.lang.IllegalStateException,
513 java.lang.SecurityException, SystemException {
514 EmbeddedCacheManager manager = this.cm;
515 if (manager == null) {
516 throw new IllegalStateException();
518 TransactionManager tm = manager.getCache("transactional-type")
519 .getAdvancedCache().getTransactionManager();
521 throw new IllegalStateException();
527 public Transaction tgetTransaction() throws SystemException {
528 EmbeddedCacheManager manager = this.cm;
529 if (manager == null) {
530 throw new IllegalStateException();
532 TransactionManager tm = manager.getCache("transactional-type")
533 .getAdvancedCache().getTransactionManager();
537 return tm.getTransaction();
541 public boolean amIStandby() {
542 EmbeddedCacheManager manager = this.cm;
543 if (manager == null) {
544 // In case we cannot fetch the information, lets assume we
545 // are standby, so to have less responsibility.
548 return (!manager.isCoordinator());
551 private InetAddress addressToInetAddress(Address a) {
552 EmbeddedCacheManager manager = this.cm;
553 if ((manager == null) || (a == null)) {
554 // In case we cannot fetch the information, lets assume we
555 // are standby, so to have less responsibility.
558 Transport t = manager.getTransport();
559 if (t instanceof JGroupsTransport) {
560 JGroupsTransport jt = (JGroupsTransport) t;
561 Channel c = jt.getChannel();
562 if (a instanceof JGroupsAddress) {
563 JGroupsAddress ja = (JGroupsAddress) a;
564 org.jgroups.Address phys = (org.jgroups.Address) c
565 .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
566 .getJGroupsAddress()));
567 if (phys instanceof org.jgroups.stack.IpAddress) {
568 InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
578 public List<InetAddress> getClusteredControllers() {
579 EmbeddedCacheManager manager = this.cm;
580 if (manager == null) {
583 List<Address> controllers = manager.getMembers();
584 if ((controllers == null) || controllers.size() == 0) {
588 List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
589 for (Address a : controllers) {
590 InetAddress inetAddress = addressToInetAddress(a);
591 if (inetAddress != null
592 && !inetAddress.getHostAddress().equals(loopbackAddress)) {
593 clusteredControllers.add(inetAddress);
596 return clusteredControllers;
600 public InetAddress getMyAddress() {
601 EmbeddedCacheManager manager = this.cm;
602 if (manager == null) {
605 return addressToInetAddress(manager.getAddress());
609 public InetAddress getActiveAddress() {
610 EmbeddedCacheManager manager = this.cm;
611 if (manager == null) {
612 // In case we cannot fetch the information, lets assume we
613 // are standby, so to have less responsibility.
617 return addressToInetAddress(manager.getCoordinator());
621 public void listenRoleChange(IListenRoleChange i)
622 throws ListenRoleChangeAddException {
623 EmbeddedCacheManager manager = this.cm;
624 if (manager == null) {
625 // In case we cannot fetch the information, lets assume we
626 // are standby, so to have less responsibility.
627 throw new ListenRoleChangeAddException();
630 if (this.roleChangeListeners == null) {
631 this.roleChangeListeners = new HashSet<IListenRoleChange>();
632 this.cacheManagerListener = new ViewChangedListener(
633 this.roleChangeListeners);
634 manager.addListener(this.cacheManagerListener);
637 if (this.roleChangeListeners != null) {
638 this.roleChangeListeners.add(i);
643 public void unlistenRoleChange(IListenRoleChange i) {
644 EmbeddedCacheManager manager = this.cm;
645 if (manager == null) {
646 // In case we cannot fetch the information, lets assume we
647 // are standby, so to have less responsibility.
651 if (this.roleChangeListeners != null) {
652 this.roleChangeListeners.remove(i);
655 if ((this.roleChangeListeners != null && this.roleChangeListeners
657 && (this.cacheManagerListener != null)) {
658 manager.removeListener(this.cacheManagerListener);
659 this.cacheManagerListener = null;
660 this.roleChangeListeners = null;
665 public class ViewChangedListener {
666 Set<IListenRoleChange> roleListeners;
668 public ViewChangedListener(Set<IListenRoleChange> s) {
669 this.roleListeners = s;
673 public void viewChanged(ViewChangedEvent e) {
674 for (IListenRoleChange i : this.roleListeners) {
675 i.newActiveAvailable();