3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.clustering.services_implementation.internal;
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.util.ArrayList;
17 import java.util.EnumSet;
18 import java.util.Enumeration;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Properties;
23 import java.util.StringTokenizer;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.TimeUnit;
27 import javax.transaction.HeuristicMixedException;
28 import javax.transaction.HeuristicRollbackException;
29 import javax.transaction.NotSupportedException;
30 import javax.transaction.RollbackException;
31 import javax.transaction.SystemException;
32 import javax.transaction.Transaction;
33 import javax.transaction.TransactionManager;
35 import org.infinispan.Cache;
36 import org.infinispan.configuration.cache.Configuration;
37 import org.infinispan.configuration.cache.ConfigurationBuilder;
38 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
39 import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
40 import org.infinispan.configuration.parsing.ParserRegistry;
41 import org.infinispan.manager.DefaultCacheManager;
42 import org.infinispan.manager.EmbeddedCacheManager;
43 import org.infinispan.notifications.Listener;
44 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
45 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
46 import org.infinispan.remoting.transport.Address;
47 import org.infinispan.remoting.transport.Transport;
48 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
49 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
50 import org.jgroups.Channel;
51 import org.jgroups.Event;
52 import org.jgroups.stack.GossipRouter;
53 import org.opendaylight.controller.clustering.services.CacheConfigException;
54 import org.opendaylight.controller.clustering.services.CacheExistException;
55 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
56 import org.opendaylight.controller.clustering.services.IClusterServices;
57 import org.opendaylight.controller.clustering.services.IGetUpdates;
58 import org.opendaylight.controller.clustering.services.IListenRoleChange;
59 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 public class ClusterManager implements IClusterServices {
64 protected static final Logger logger = LoggerFactory
65 .getLogger(ClusterManager.class);
66 private DefaultCacheManager cm;
67 GossipRouter gossiper;
68 private HashSet<IListenRoleChange> roleChangeListeners;
69 private ViewChangedListener cacheManagerListener;
71 private static String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
72 private static final int gossipRouterPortDefault = 12001;
73 // defaultTransactionTimeout is 60 seconds
74 private static int DEFAULT_TRANSACTION_TIMEOUT = 60;
77 * Start a JGroups GossipRouter if we are a supernode. The
78 * GosispRouter is nothing more than a simple
79 * rendevouz-pointer. All the nodes that wants to join the cluster
80 * will come to any of the rendevouz point and they introduce the
81 * nodes to all the others. Once the meet and greet phase if over,
82 * the nodes will open a full-mesh with the remaining n-1 nodes,
83 * so even if the GossipRouter goes down nothing is lost.
84 * NOTE: This function has the side effect to set some of the
85 * JGROUPS configurations, this because in this function already
86 * we try to retrieve some of the network capabilities of the
87 * device and so it's better not to do that again
90 * @return GossipRouter
92 private GossipRouter startGossiper() {
93 boolean amIGossipRouter = false;
94 Integer gossipRouterPort = gossipRouterPortDefault;
95 InetAddress gossipRouterAddress = null;
96 String supernodes_list = System.getProperty("supernodes",
98 StringBuilder sanitized_supernodes_list = new StringBuilder();
99 List<InetAddress> myAddresses = new ArrayList<InetAddress>();
101 StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
102 if (supernodes.hasMoreTokens()) {
103 // Populate the list of my addresses
105 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
106 while (e.hasMoreElements()) {
107 NetworkInterface n = e.nextElement();
108 Enumeration<InetAddress> ee = n.getInetAddresses();
109 while (ee.hasMoreElements()) {
110 InetAddress i = ee.nextElement();
114 } catch (SocketException se) {
115 logger.error("Cannot get the list of network interfaces");
119 while (supernodes.hasMoreTokens()) {
120 String curr_supernode = supernodes.nextToken();
121 logger.debug("Examining supernode {}", curr_supernode);
122 StringTokenizer host_port = new StringTokenizer(curr_supernode,
126 Integer port_num = gossipRouterPortDefault;
127 if (host_port.countTokens() > 2) {
128 logger.error("Error parsing supernode {} proceed to the next one",
132 host = host_port.nextToken();
133 InetAddress hostAddr;
135 hostAddr = InetAddress.getByName(host);
136 } catch (UnknownHostException ue) {
137 logger.error("Host {} is not known", host);
140 if (host_port.hasMoreTokens()) {
141 port = host_port.nextToken();
143 port_num = Integer.valueOf(port);
144 } catch (NumberFormatException ne) {
145 logger.error("Supplied supernode gossip port is not recognized, using default gossip port {}",
146 gossipRouterPortDefault);
147 port_num = gossipRouterPortDefault;
149 if ((port_num > 65535) || (port_num < 0)) {
150 logger.error("Supplied supernode gossip port is outside a valid TCP port range");
151 port_num = gossipRouterPortDefault;
154 if (!amIGossipRouter) {
156 for (InetAddress myAddr : myAddresses) {
157 if (myAddr.equals(hostAddr)) {
158 amIGossipRouter = true;
159 gossipRouterAddress = hostAddr;
160 gossipRouterPort = port_num;
166 if (!sanitized_supernodes_list.toString().equals("")) {
167 sanitized_supernodes_list.append(",");
169 sanitized_supernodes_list.append(hostAddr.getHostAddress()).append("[").append(port_num).append("]");
172 if (amIGossipRouter) {
173 // Set the Jgroups binding interface to the one we got
174 // from the supernodes attribute
175 if (gossipRouterAddress != null) {
176 System.setProperty("jgroups.tcp.address", gossipRouterAddress
180 // Set the Jgroup binding interface to the one we are well
181 // known outside or else to the first with non-local
184 String myBind = InetAddress.getLocalHost().getHostAddress();
186 || InetAddress.getLocalHost().isLoopbackAddress()) {
187 for (InetAddress myAddr : myAddresses) {
188 if (myAddr.isLoopbackAddress()
189 || myAddr.isLinkLocalAddress()) {
190 logger.debug("Skipping local address {}",
191 myAddr.getHostAddress());
194 // First non-local address
195 myBind = myAddr.getHostAddress();
196 logger.debug("First non-local address {}", myBind);
201 String jgroupAddress = System
202 .getProperty("jgroups.tcp.address");
203 if (jgroupAddress == null) {
204 if (myBind != null) {
205 logger.debug("Set bind address to be {}", myBind);
206 System.setProperty("jgroups.tcp.address", myBind);
209 .debug("Set bind address to be LOCALHOST=127.0.0.1");
210 System.setProperty("jgroups.tcp.address", "127.0.0.1");
213 logger.debug("jgroup.tcp.address already set to be {}",
216 } catch (UnknownHostException uhe) {
218 .error("Met UnknownHostException while trying to get binding address for jgroups");
222 // The supernodes list constitute also the tcpgossip initial
224 System.setProperty("jgroups.tcpgossip.initial_hosts",
225 sanitized_supernodes_list.toString());
226 logger.debug("jgroups.tcp.address set to {}",
227 System.getProperty("jgroups.tcp.address"));
228 logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
229 System.getProperty("jgroups.tcpgossip.initial_hosts"));
230 GossipRouter res = null;
231 if (amIGossipRouter) {
232 logger.info("I'm a GossipRouter will listen on port {}",
234 // Start a GossipRouter with JMX support
235 res = new GossipRouter(gossipRouterPort, null, true);
240 private void exitOnSecurityException(Exception ioe) {
241 Throwable cause = ioe.getCause();
242 while (cause != null) {
243 if (cause instanceof java.lang.SecurityException) {
244 logger.error("Failed Cluster authentication. Stopping Controller...");
247 cause = cause.getCause();
251 public void start() {
252 this.gossiper = startGossiper();
253 if (this.gossiper != null) {
254 logger.debug("Trying to start Gossiper");
256 this.gossiper.start();
257 logger.info("Started GossipRouter");
258 } catch (Exception e) {
259 logger.error("GossipRouter didn't start. Exception Stack Trace",
263 logger.info("Starting the ClusterManager");
265 ParserRegistry parser = new ParserRegistry(this.getClass()
267 String infinispanConfigFile =
268 System.getProperty("org.infinispan.config.file", "config/infinispan-config.xml");
269 logger.debug("Using configuration file:{}", infinispanConfigFile);
270 ConfigurationBuilderHolder holder = parser.parseFile(infinispanConfigFile);
271 GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
272 globalBuilder.serialization()
273 .classResolver(new ClassResolver())
275 this.cm = new DefaultCacheManager(holder, false);
276 logger.debug("Allocated ClusterManager");
277 if (this.cm != null) {
279 this.cm.startCache();
280 logger.debug("Started the ClusterManager");
282 } catch (Exception ioe) {
283 logger.error("Cannot configure infinispan .. bailing out ");
284 logger.error("Stack Trace that raised th exception");
285 logger.error("",ioe);
287 exitOnSecurityException(ioe);
290 logger.debug("Cache Manager has value {}", this.cm);
294 logger.info("Stopping the ClusterManager");
295 if (this.cm != null) {
296 logger.info("Found a valid ClusterManager, now let it be stopped");
300 if (this.gossiper != null) {
301 this.gossiper.stop();
302 this.gossiper = null;
307 public ConcurrentMap<?, ?> createCache(String containerName,
308 String cacheName, Set<cacheMode> cMode) throws CacheExistException,
309 CacheConfigException {
310 EmbeddedCacheManager manager = this.cm;
311 Cache<Object,Object> c;
312 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
313 if (manager == null) {
317 if (manager.cacheExists(realCacheName)) {
318 throw new CacheExistException();
321 // Sanity check to avoid contrasting parameters between transactional
323 if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
324 IClusterServices.cacheMode.TRANSACTIONAL))) {
325 throw new CacheConfigException();
328 // Sanity check to avoid contrasting parameters between sync and async
329 if (cMode.containsAll(EnumSet.of(IClusterServices.cacheMode.SYNC, IClusterServices.cacheMode.ASYNC))) {
330 throw new CacheConfigException();
333 Configuration fromTemplateConfig = null;
335 * Fetch transactional/non-transactional templates
337 // Check if transactional
338 if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
339 fromTemplateConfig = manager.getCacheConfiguration("transactional-type");
340 } else if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
341 fromTemplateConfig = manager.getDefaultCacheConfiguration();
344 // If none set the transactional property then just return null
345 if (fromTemplateConfig == null) {
349 ConfigurationBuilder builder = new ConfigurationBuilder();
350 builder.read(fromTemplateConfig);
352 * Now evaluate async/sync
354 if (cMode.contains(IClusterServices.cacheMode.ASYNC)) {
356 .cacheMode(fromTemplateConfig.clustering()
359 } else if (cMode.contains(IClusterServices.cacheMode.SYNC)) {
361 .cacheMode(fromTemplateConfig.clustering()
366 manager.defineConfiguration(realCacheName, builder.build());
367 c = manager.getCache(realCacheName);
372 public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
373 EmbeddedCacheManager manager = this.cm;
374 Cache<Object,Object> c;
375 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
376 if (manager == null) {
380 if (manager.cacheExists(realCacheName)) {
381 c = manager.getCache(realCacheName);
388 public void destroyCache(String containerName, String cacheName) {
389 EmbeddedCacheManager manager = this.cm;
390 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
391 if (manager == null) {
394 if (manager.cacheExists(realCacheName)) {
395 manager.removeCache(realCacheName);
400 public boolean existCache(String containerName, String cacheName) {
401 EmbeddedCacheManager manager = this.cm;
403 if (manager == null) {
407 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
408 return manager.cacheExists(realCacheName);
412 public Set<String> getCacheList(String containerName) {
413 Set<String> perContainerCaches = new HashSet<String>();
414 EmbeddedCacheManager manager = this.cm;
415 if (manager == null) {
418 for (String cacheName : manager.getCacheNames()) {
419 if (!manager.isRunning(cacheName)) continue;
420 if (cacheName.startsWith("{" + containerName + "}_")) {
421 String[] res = cacheName.split("[{}]");
422 if (res.length >= 4 && res[1].equals(containerName)
423 && res[2].equals("_")) {
424 perContainerCaches.add(res[3]);
429 return (perContainerCaches);
433 public Properties getCacheProperties(String containerName, String cacheName) {
434 EmbeddedCacheManager manager = this.cm;
435 if (manager == null) {
438 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
439 if (!manager.cacheExists(realCacheName)) {
442 Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
443 .getCacheConfiguration();
444 Properties p = new Properties();
445 p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
446 conf.transaction().toString());
447 p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
448 conf.clustering().toString());
449 p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
450 .locking().toString());
455 public void addListener(String containerName, String cacheName,
456 IGetUpdates<?, ?> u) throws CacheListenerAddException {
457 EmbeddedCacheManager manager = this.cm;
458 Cache<Object,Object> c;
459 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
460 if (manager == null) {
464 if (!manager.cacheExists(realCacheName)) {
465 throw new CacheListenerAddException();
467 c = manager.getCache(realCacheName);
468 CacheListenerContainer cl = new CacheListenerContainer(u,
469 containerName, cacheName);
474 public Set<IGetUpdates<?, ?>> getListeners(String containerName,
476 EmbeddedCacheManager manager = this.cm;
477 Cache<Object,Object> c;
478 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
479 if (manager == null) {
483 if (!manager.cacheExists(realCacheName)) {
486 c = manager.getCache(realCacheName);
488 Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
489 Set<Object> listeners = c.getListeners();
490 for (Object listener : listeners) {
491 if (listener instanceof CacheListenerContainer) {
492 CacheListenerContainer cl = (CacheListenerContainer) listener;
493 res.add(cl.whichListener());
501 public void removeListener(String containerName, String cacheName,
502 IGetUpdates<?, ?> u) {
503 EmbeddedCacheManager manager = this.cm;
504 Cache<Object,Object> c;
505 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
506 if (manager == null) {
510 if (!manager.cacheExists(realCacheName)) {
513 c = manager.getCache(realCacheName);
515 Set<Object> listeners = c.getListeners();
516 for (Object listener : listeners) {
517 if (listener instanceof CacheListenerContainer) {
518 CacheListenerContainer cl = (CacheListenerContainer) listener;
519 if (cl.whichListener() == u) {
520 c.removeListener(listener);
528 public void tbegin() throws NotSupportedException, SystemException {
529 // call tbegin with the default timeout
530 tbegin(DEFAULT_TRANSACTION_TIMEOUT, TimeUnit.SECONDS);
534 public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException {
535 EmbeddedCacheManager manager = this.cm;
536 if (manager == null) {
537 throw new IllegalStateException();
539 TransactionManager tm = manager.getCache("transactional-type")
540 .getAdvancedCache().getTransactionManager();
542 throw new IllegalStateException();
544 long timeoutSec = unit.toSeconds(timeout);
545 if((timeoutSec > Integer.MAX_VALUE) || (timeoutSec <= 0)) {
546 // fall back to the default timeout
547 tm.setTransactionTimeout(DEFAULT_TRANSACTION_TIMEOUT);
550 // as here we are sure that timeoutSec < = Integer.MAX_VALUE.
551 tm.setTransactionTimeout((int) timeoutSec);
557 public void tcommit() throws RollbackException, HeuristicMixedException,
558 HeuristicRollbackException, java.lang.SecurityException,
559 java.lang.IllegalStateException, SystemException {
560 EmbeddedCacheManager manager = this.cm;
561 if (manager == null) {
562 throw new IllegalStateException();
564 TransactionManager tm = manager.getCache("transactional-type")
565 .getAdvancedCache().getTransactionManager();
567 throw new IllegalStateException();
573 public void trollback() throws java.lang.IllegalStateException,
574 java.lang.SecurityException, SystemException {
575 EmbeddedCacheManager manager = this.cm;
576 if (manager == null) {
577 throw new IllegalStateException();
579 TransactionManager tm = manager.getCache("transactional-type")
580 .getAdvancedCache().getTransactionManager();
582 throw new IllegalStateException();
588 public Transaction tgetTransaction() throws SystemException {
589 EmbeddedCacheManager manager = this.cm;
590 if (manager == null) {
591 throw new IllegalStateException();
593 TransactionManager tm = manager.getCache("transactional-type")
594 .getAdvancedCache().getTransactionManager();
598 return tm.getTransaction();
602 public boolean amIStandby() {
603 EmbeddedCacheManager manager = this.cm;
604 if (manager == null) {
605 // In case we cannot fetch the information, lets assume we
606 // are standby, so to have less responsibility.
609 return (!manager.isCoordinator());
612 private InetAddress addressToInetAddress(Address a) {
613 EmbeddedCacheManager manager = this.cm;
614 if ((manager == null) || (a == null)) {
615 // In case we cannot fetch the information, lets assume we
616 // are standby, so to have less responsibility.
619 Transport t = manager.getTransport();
620 if (t instanceof JGroupsTransport) {
621 JGroupsTransport jt = (JGroupsTransport) t;
622 Channel c = jt.getChannel();
623 if (a instanceof JGroupsAddress) {
624 JGroupsAddress ja = (JGroupsAddress) a;
625 org.jgroups.Address phys = (org.jgroups.Address) c
626 .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
627 .getJGroupsAddress()));
628 if (phys instanceof org.jgroups.stack.IpAddress) {
629 InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
639 public List<InetAddress> getClusteredControllers() {
640 EmbeddedCacheManager manager = this.cm;
641 if (manager == null) {
644 List<Address> controllers = manager.getMembers();
645 if ((controllers == null) || controllers.size() == 0) {
649 List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
650 for (Address a : controllers) {
651 InetAddress inetAddress = addressToInetAddress(a);
652 if (inetAddress != null
653 && !inetAddress.getHostAddress().equals(loopbackAddress)) {
654 clusteredControllers.add(inetAddress);
657 return clusteredControllers;
661 public InetAddress getMyAddress() {
662 EmbeddedCacheManager manager = this.cm;
663 if (manager == null) {
666 return addressToInetAddress(manager.getAddress());
670 public InetAddress getActiveAddress() {
671 EmbeddedCacheManager manager = this.cm;
672 if (manager == null) {
673 // In case we cannot fetch the information, lets assume we
674 // are standby, so to have less responsibility.
678 return addressToInetAddress(manager.getCoordinator());
682 public void listenRoleChange(IListenRoleChange i)
683 throws ListenRoleChangeAddException {
684 EmbeddedCacheManager manager = this.cm;
685 if (manager == null) {
686 // In case we cannot fetch the information, lets assume we
687 // are standby, so to have less responsibility.
688 throw new ListenRoleChangeAddException();
691 if (this.roleChangeListeners == null) {
692 this.roleChangeListeners = new HashSet<IListenRoleChange>();
693 this.cacheManagerListener = new ViewChangedListener(
694 this.roleChangeListeners);
695 manager.addListener(this.cacheManagerListener);
698 if (this.roleChangeListeners != null) {
699 this.roleChangeListeners.add(i);
704 public void unlistenRoleChange(IListenRoleChange i) {
705 EmbeddedCacheManager manager = this.cm;
706 if (manager == null) {
707 // In case we cannot fetch the information, lets assume we
708 // are standby, so to have less responsibility.
712 if (this.roleChangeListeners != null) {
713 this.roleChangeListeners.remove(i);
716 if ((this.roleChangeListeners != null && this.roleChangeListeners
718 && (this.cacheManagerListener != null)) {
719 manager.removeListener(this.cacheManagerListener);
720 this.cacheManagerListener = null;
721 this.roleChangeListeners = null;
726 public class ViewChangedListener {
727 Set<IListenRoleChange> roleListeners;
729 public ViewChangedListener(Set<IListenRoleChange> s) {
730 this.roleListeners = s;
734 public void viewChanged(ViewChangedEvent e) {
735 for (IListenRoleChange i : this.roleListeners) {
736 i.newActiveAvailable();