3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.clustering.services_implementation.internal;
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.util.ArrayList;
17 import java.util.EnumSet;
18 import java.util.Enumeration;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Properties;
23 import java.util.StringTokenizer;
24 import java.util.concurrent.ConcurrentMap;
26 import javax.transaction.HeuristicMixedException;
27 import javax.transaction.HeuristicRollbackException;
28 import javax.transaction.NotSupportedException;
29 import javax.transaction.RollbackException;
30 import javax.transaction.SystemException;
31 import javax.transaction.Transaction;
32 import javax.transaction.TransactionManager;
34 import org.infinispan.Cache;
35 import org.infinispan.configuration.cache.Configuration;
36 import org.infinispan.configuration.cache.ConfigurationBuilder;
37 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
38 import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
39 import org.infinispan.configuration.parsing.ParserRegistry;
40 import org.infinispan.manager.DefaultCacheManager;
41 import org.infinispan.manager.EmbeddedCacheManager;
42 import org.infinispan.notifications.Listener;
43 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
44 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
45 import org.infinispan.remoting.transport.Address;
46 import org.infinispan.remoting.transport.Transport;
47 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
48 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
49 import org.jgroups.Channel;
50 import org.jgroups.Event;
51 import org.jgroups.stack.GossipRouter;
52 import org.opendaylight.controller.clustering.services.CacheConfigException;
53 import org.opendaylight.controller.clustering.services.CacheExistException;
54 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
55 import org.opendaylight.controller.clustering.services.IClusterServices;
56 import org.opendaylight.controller.clustering.services.IGetUpdates;
57 import org.opendaylight.controller.clustering.services.IListenRoleChange;
58 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 public class ClusterManager implements IClusterServices {
63 protected static final Logger logger = LoggerFactory
64 .getLogger(ClusterManager.class);
65 private DefaultCacheManager cm;
66 GossipRouter gossiper;
67 private HashSet<IListenRoleChange> roleChangeListeners;
68 private ViewChangedListener cacheManagerListener;
70 private static String loopbackAddress = "127.0.0.1";
73 * Start a JGroups GossipRouter if we are a supernode. The
74 * GosispRouter is nothing more than a simple
75 * rendevouz-pointer. All the nodes that wants to join the cluster
76 * will come to any of the rendevouz point and they introduce the
77 * nodes to all the others. Once the meet and greet phase if over,
78 * the nodes will open a full-mesh with the remaining n-1 nodes,
79 * so even if the GossipRouter goes down nothing is lost.
80 * NOTE: This function has the side effect to set some of the
81 * JGROUPS configurations, this because in this function already
82 * we try to retrieve some of the network capabilities of the
83 * device and so it's better not to do that again
86 * @return GossipRouter
88 private GossipRouter startGossiper() {
89 boolean amIGossipRouter = false;
90 Integer gossipRouterPortDefault = 12001;
91 Integer gossipRouterPort = gossipRouterPortDefault;
92 InetAddress gossipRouterAddress = null;
93 String supernodes_list = System.getProperty("supernodes",
95 StringBuffer sanitized_supernodes_list = new StringBuffer();
96 List<InetAddress> myAddresses = new ArrayList<InetAddress>();
98 StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
99 if (supernodes.hasMoreTokens()) {
100 // Populate the list of my addresses
102 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
103 while (e.hasMoreElements()) {
104 NetworkInterface n = e.nextElement();
105 Enumeration<InetAddress> ee = n.getInetAddresses();
106 while (ee.hasMoreElements()) {
107 InetAddress i = ee.nextElement();
111 } catch (SocketException se) {
112 logger.error("Cannot get the list of network interfaces");
116 while (supernodes.hasMoreTokens()) {
117 String curr_supernode = supernodes.nextToken();
118 logger.debug("Examining supernode {}", curr_supernode);
119 StringTokenizer host_port = new StringTokenizer(curr_supernode,
123 Integer port_num = gossipRouterPortDefault;
124 if (host_port.countTokens() > 2) {
125 logger.error("Error parsing supernode {} proceed to the next one",
129 host = host_port.nextToken();
130 InetAddress hostAddr;
132 hostAddr = InetAddress.getByName(host);
133 } catch (UnknownHostException ue) {
134 logger.error("Host not known");
137 if (host_port.hasMoreTokens()) {
138 port = host_port.nextToken();
140 port_num = Integer.valueOf(port);
141 } catch (NumberFormatException ne) {
143 .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
144 port_num = gossipRouterPortDefault;
146 if ((port_num > 65535) || (port_num < 0)) {
148 .error("Supplied supernode gossip port is outside a valid TCP port range");
149 port_num = gossipRouterPortDefault;
152 if (!amIGossipRouter) {
154 for (InetAddress myAddr : myAddresses) {
155 if (myAddr.equals(hostAddr)) {
156 amIGossipRouter = true;
157 gossipRouterAddress = hostAddr;
158 gossipRouterPort = port_num;
164 if (!sanitized_supernodes_list.toString().equals("")) {
165 sanitized_supernodes_list.append(",");
167 sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
171 if (amIGossipRouter) {
172 // Set the Jgroups binding interface to the one we got
173 // from the supernodes attribute
174 if (gossipRouterAddress != null) {
175 System.setProperty("jgroups.tcp.address", gossipRouterAddress
179 // Set the Jgroup binding interface to the one we are well
180 // known outside or else to the first with non-local
183 String myBind = InetAddress.getLocalHost().getHostAddress();
185 || InetAddress.getLocalHost().isLoopbackAddress()) {
186 for (InetAddress myAddr : myAddresses) {
187 if (myAddr.isLoopbackAddress()
188 || myAddr.isLinkLocalAddress()) {
189 logger.debug("Skipping local address {}",
190 myAddr.getHostAddress());
193 // First non-local address
194 myBind = myAddr.getHostAddress();
195 logger.debug("First non-local address {}", myBind);
200 String jgroupAddress = System
201 .getProperty("jgroups.tcp.address");
202 if (jgroupAddress == null) {
203 if (myBind != null) {
204 logger.debug("Set bind address to be {}", myBind);
205 System.setProperty("jgroups.tcp.address", myBind);
208 .debug("Set bind address to be LOCALHOST=127.0.0.1");
209 System.setProperty("jgroups.tcp.address", "127.0.0.1");
212 logger.debug("jgroup.tcp.address already set to be {}",
215 } catch (UnknownHostException uhe) {
217 .error("Met UnknownHostException while trying to get binding address for jgroups");
221 // The supernodes list constitute also the tcpgossip initial
223 System.setProperty("jgroups.tcpgossip.initial_hosts",
224 sanitized_supernodes_list.toString());
225 logger.debug("jgroups.tcp.address set to {}",
226 System.getProperty("jgroups.tcp.address"));
227 logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
228 System.getProperty("jgroups.tcpgossip.initial_hosts"));
229 GossipRouter res = null;
230 if (amIGossipRouter) {
231 logger.info("I'm a GossipRouter will listen on port {}",
233 // Start a GossipRouter with JMX support
234 res = new GossipRouter(gossipRouterPort, null, true);
239 private void exitOnSecurityException(Exception ioe) {
240 Throwable cause = ioe.getCause();
241 while (cause != null) {
242 if (cause instanceof java.lang.SecurityException) {
243 logger.error("Failed Cluster authentication. Stopping Controller...");
246 cause = cause.getCause();
250 public void start() {
251 this.gossiper = startGossiper();
252 if (this.gossiper != null) {
253 logger.debug("Trying to start Gossiper");
255 this.gossiper.start();
256 logger.info("Started GossipRouter");
257 } catch (Exception e) {
258 logger.error("GossipRouter didn't start. Exception Stack Trace",
262 logger.info("Starting the ClusterManager");
264 ParserRegistry parser = new ParserRegistry(this.getClass()
266 String infinispanConfigFile =
267 System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
268 logger.debug("Using configuration file:{}", infinispanConfigFile);
269 ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
270 GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
271 globalBuilder.serialization()
272 .classResolver(new ClassResolver())
274 this.cm = new DefaultCacheManager(holder, false);
275 logger.debug("Allocated ClusterManager");
276 if (this.cm != null) {
278 this.cm.startCache();
279 logger.debug("Started the ClusterManager");
281 } catch (Exception ioe) {
282 logger.error("Cannot configure infinispan .. bailing out ");
283 logger.error("Stack Trace that raised th exception");
284 logger.error("",ioe);
286 exitOnSecurityException(ioe);
289 logger.debug("Cache Manager has value {}", this.cm);
293 logger.info("Stopping the ClusterManager");
294 if (this.cm != null) {
295 logger.info("Found a valid ClusterManager, now let it be stopped");
299 if (this.gossiper != null) {
300 this.gossiper.stop();
301 this.gossiper = null;
306 public ConcurrentMap<?, ?> createCache(String containerName,
307 String cacheName, Set<cacheMode> cMode) throws CacheExistException,
308 CacheConfigException {
309 EmbeddedCacheManager manager = this.cm;
310 Cache<Object,Object> c;
311 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
312 if (manager == null) {
316 if (manager.cacheExists(realCacheName)) {
317 throw new CacheExistException();
320 // Sanity check to avoid contrasting parameters between transactional
322 if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
323 IClusterServices.cacheMode.TRANSACTIONAL))) {
324 throw new CacheConfigException();
327 // Sanity check to avoid contrasting parameters between sync and async
328 if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) {
329 throw new CacheConfigException();
332 Configuration fromTemplateConfig = null;
334 * Fetch transactional/non-transactional templates
336 // Check if transactional
337 if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
338 fromTemplateConfig = manager.getCacheConfiguration("transactional-type");
339 } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
340 fromTemplateConfig = manager.getDefaultCacheConfiguration();
343 // If none set the transactional property then just return null
344 if (fromTemplateConfig == null) {
348 ConfigurationBuilder builder = new ConfigurationBuilder();
349 builder.read(fromTemplateConfig);
351 * Now evaluate async/sync
353 if (cMode.contains(IClusterServices.cacheMode.ASYNC)) {
355 .cacheMode(fromTemplateConfig.clustering()
358 } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) {
360 .cacheMode(fromTemplateConfig.clustering()
365 manager.defineConfiguration(realCacheName, builder.build());
366 c = manager.getCache(realCacheName);
371 public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
372 EmbeddedCacheManager manager = this.cm;
373 Cache<Object,Object> c;
374 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
375 if (manager == null) {
379 if (manager.cacheExists(realCacheName)) {
380 c = manager.getCache(realCacheName);
387 public void destroyCache(String containerName, String cacheName) {
388 EmbeddedCacheManager manager = this.cm;
389 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
390 if (manager == null) {
393 if (manager.cacheExists(realCacheName)) {
394 manager.removeCache(realCacheName);
399 public boolean existCache(String containerName, String cacheName) {
400 EmbeddedCacheManager manager = this.cm;
402 if (manager == null) {
406 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
407 return manager.cacheExists(realCacheName);
411 public Set<String> getCacheList(String containerName) {
412 Set<String> perContainerCaches = new HashSet<String>();
413 EmbeddedCacheManager manager = this.cm;
414 if (manager == null) {
417 for (String cacheName : manager.getCacheNames()) {
418 if (!manager.isRunning(cacheName)) continue;
419 if (cacheName.startsWith("{" + containerName + "}_")) {
420 String[] res = cacheName.split("[{}]");
421 if (res.length >= 4 && res[1].equals(containerName)
422 && res[2].equals("_")) {
423 perContainerCaches.add(res[3]);
428 return (perContainerCaches);
432 public Properties getCacheProperties(String containerName, String cacheName) {
433 EmbeddedCacheManager manager = this.cm;
434 if (manager == null) {
437 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
438 if (!manager.cacheExists(realCacheName)) {
441 Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
442 .getCacheConfiguration();
443 Properties p = new Properties();
444 p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
445 conf.transaction().toString());
446 p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
447 conf.clustering().toString());
448 p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
449 .locking().toString());
454 public void addListener(String containerName, String cacheName,
455 IGetUpdates<?, ?> u) throws CacheListenerAddException {
456 EmbeddedCacheManager manager = this.cm;
457 Cache<Object,Object> c;
458 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
459 if (manager == null) {
463 if (!manager.cacheExists(realCacheName)) {
464 throw new CacheListenerAddException();
466 c = manager.getCache(realCacheName);
467 CacheListenerContainer cl = new CacheListenerContainer(u,
468 containerName, cacheName);
473 public Set<IGetUpdates<?, ?>> getListeners(String containerName,
475 EmbeddedCacheManager manager = this.cm;
476 Cache<Object,Object> c;
477 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
478 if (manager == null) {
482 if (!manager.cacheExists(realCacheName)) {
485 c = manager.getCache(realCacheName);
487 Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
488 Set<Object> listeners = c.getListeners();
489 for (Object listener : listeners) {
490 if (listener instanceof CacheListenerContainer) {
491 CacheListenerContainer cl = (CacheListenerContainer) listener;
492 res.add(cl.whichListener());
500 public void removeListener(String containerName, String cacheName,
501 IGetUpdates<?, ?> u) {
502 EmbeddedCacheManager manager = this.cm;
503 Cache<Object,Object> c;
504 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
505 if (manager == null) {
509 if (!manager.cacheExists(realCacheName)) {
512 c = manager.getCache(realCacheName);
514 Set<Object> listeners = c.getListeners();
515 for (Object listener : listeners) {
516 if (listener instanceof CacheListenerContainer) {
517 CacheListenerContainer cl = (CacheListenerContainer) listener;
518 if (cl.whichListener() == u) {
519 c.removeListener(listener);
527 public void tbegin() throws NotSupportedException, SystemException {
528 EmbeddedCacheManager manager = this.cm;
529 if (manager == null) {
530 throw new IllegalStateException();
532 TransactionManager tm = manager.getCache("transactional-type")
533 .getAdvancedCache().getTransactionManager();
535 throw new IllegalStateException();
541 public void tcommit() throws RollbackException, HeuristicMixedException,
542 HeuristicRollbackException, java.lang.SecurityException,
543 java.lang.IllegalStateException, SystemException {
544 EmbeddedCacheManager manager = this.cm;
545 if (manager == null) {
546 throw new IllegalStateException();
548 TransactionManager tm = manager.getCache("transactional-type")
549 .getAdvancedCache().getTransactionManager();
551 throw new IllegalStateException();
557 public void trollback() throws java.lang.IllegalStateException,
558 java.lang.SecurityException, SystemException {
559 EmbeddedCacheManager manager = this.cm;
560 if (manager == null) {
561 throw new IllegalStateException();
563 TransactionManager tm = manager.getCache("transactional-type")
564 .getAdvancedCache().getTransactionManager();
566 throw new IllegalStateException();
572 public Transaction tgetTransaction() throws SystemException {
573 EmbeddedCacheManager manager = this.cm;
574 if (manager == null) {
575 throw new IllegalStateException();
577 TransactionManager tm = manager.getCache("transactional-type")
578 .getAdvancedCache().getTransactionManager();
582 return tm.getTransaction();
586 public boolean amIStandby() {
587 EmbeddedCacheManager manager = this.cm;
588 if (manager == null) {
589 // In case we cannot fetch the information, lets assume we
590 // are standby, so to have less responsibility.
593 return (!manager.isCoordinator());
596 private InetAddress addressToInetAddress(Address a) {
597 EmbeddedCacheManager manager = this.cm;
598 if ((manager == null) || (a == null)) {
599 // In case we cannot fetch the information, lets assume we
600 // are standby, so to have less responsibility.
603 Transport t = manager.getTransport();
604 if (t instanceof JGroupsTransport) {
605 JGroupsTransport jt = (JGroupsTransport) t;
606 Channel c = jt.getChannel();
607 if (a instanceof JGroupsAddress) {
608 JGroupsAddress ja = (JGroupsAddress) a;
609 org.jgroups.Address phys = (org.jgroups.Address) c
610 .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
611 .getJGroupsAddress()));
612 if (phys instanceof org.jgroups.stack.IpAddress) {
613 InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
623 public List<InetAddress> getClusteredControllers() {
624 EmbeddedCacheManager manager = this.cm;
625 if (manager == null) {
628 List<Address> controllers = manager.getMembers();
629 if ((controllers == null) || controllers.size() == 0) {
633 List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
634 for (Address a : controllers) {
635 InetAddress inetAddress = addressToInetAddress(a);
636 if (inetAddress != null
637 && !inetAddress.getHostAddress().equals(loopbackAddress)) {
638 clusteredControllers.add(inetAddress);
641 return clusteredControllers;
645 public InetAddress getMyAddress() {
646 EmbeddedCacheManager manager = this.cm;
647 if (manager == null) {
650 return addressToInetAddress(manager.getAddress());
654 public InetAddress getActiveAddress() {
655 EmbeddedCacheManager manager = this.cm;
656 if (manager == null) {
657 // In case we cannot fetch the information, lets assume we
658 // are standby, so to have less responsibility.
662 return addressToInetAddress(manager.getCoordinator());
666 public void listenRoleChange(IListenRoleChange i)
667 throws ListenRoleChangeAddException {
668 EmbeddedCacheManager manager = this.cm;
669 if (manager == null) {
670 // In case we cannot fetch the information, lets assume we
671 // are standby, so to have less responsibility.
672 throw new ListenRoleChangeAddException();
675 if (this.roleChangeListeners == null) {
676 this.roleChangeListeners = new HashSet<IListenRoleChange>();
677 this.cacheManagerListener = new ViewChangedListener(
678 this.roleChangeListeners);
679 manager.addListener(this.cacheManagerListener);
682 if (this.roleChangeListeners != null) {
683 this.roleChangeListeners.add(i);
688 public void unlistenRoleChange(IListenRoleChange i) {
689 EmbeddedCacheManager manager = this.cm;
690 if (manager == null) {
691 // In case we cannot fetch the information, lets assume we
692 // are standby, so to have less responsibility.
696 if (this.roleChangeListeners != null) {
697 this.roleChangeListeners.remove(i);
700 if ((this.roleChangeListeners != null && this.roleChangeListeners
702 && (this.cacheManagerListener != null)) {
703 manager.removeListener(this.cacheManagerListener);
704 this.cacheManagerListener = null;
705 this.roleChangeListeners = null;
710 public class ViewChangedListener {
711 Set<IListenRoleChange> roleListeners;
713 public ViewChangedListener(Set<IListenRoleChange> s) {
714 this.roleListeners = s;
718 public void viewChanged(ViewChangedEvent e) {
719 for (IListenRoleChange i : this.roleListeners) {
720 i.newActiveAvailable();