3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.clustering.services_implementation.internal;
12 import java.io.PrintWriter;
13 import java.io.StringWriter;
14 import java.net.InetAddress;
15 import java.net.NetworkInterface;
16 import java.net.SocketException;
17 import java.net.UnknownHostException;
18 import java.util.ArrayList;
19 import java.util.EnumSet;
20 import java.util.Enumeration;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Properties;
25 import java.util.StringTokenizer;
26 import java.util.concurrent.ConcurrentMap;
28 import javax.transaction.HeuristicMixedException;
29 import javax.transaction.HeuristicRollbackException;
30 import javax.transaction.NotSupportedException;
31 import javax.transaction.RollbackException;
32 import javax.transaction.SystemException;
33 import javax.transaction.Transaction;
34 import javax.transaction.TransactionManager;
36 import org.infinispan.Cache;
37 import org.infinispan.configuration.cache.Configuration;
38 import org.infinispan.manager.DefaultCacheManager;
39 import org.infinispan.manager.EmbeddedCacheManager;
40 import org.infinispan.notifications.Listener;
41 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
42 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
43 import org.infinispan.remoting.transport.Address;
44 import org.infinispan.remoting.transport.Transport;
45 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
46 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
47 import org.jgroups.Channel;
48 import org.jgroups.Event;
49 import org.jgroups.stack.GossipRouter;
50 import org.opendaylight.controller.clustering.services.CacheConfigException;
51 import org.opendaylight.controller.clustering.services.CacheExistException;
52 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
53 import org.opendaylight.controller.clustering.services.IClusterServices;
54 import org.opendaylight.controller.clustering.services.IGetUpdates;
55 import org.opendaylight.controller.clustering.services.IListenRoleChange;
56 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 public class ClusterManager implements IClusterServices {
61 protected static final Logger logger = LoggerFactory
62 .getLogger(ClusterManager.class);
63 private DefaultCacheManager cm;
64 GossipRouter gossiper;
65 private HashSet<IListenRoleChange> roleChangeListeners;
66 private ViewChangedListener cacheManagerListener;
68 private static String loopbackAddress = "127.0.0.1";
71 * Start a JGroups GossipRouter if we are a supernode. The
72 * GosispRouter is nothing more than a simple
73 * rendevouz-pointer. All the nodes that wants to join the cluster
74 * will come to any of the rendevouz point and they introduce the
75 * nodes to all the others. Once the meet and greet phase if over,
76 * the nodes will open a full-mesh with the remaining n-1 nodes,
77 * so even if the GossipRouter goes down nothing is lost.
78 * NOTE: This function has the side effect to set some of the
79 * JGROUPS configurations, this because in this function already
80 * we try to retrieve some of the network capabilities of the
81 * device and so it's better not to do that again
84 * @return GossipRouter
86 private GossipRouter startGossiper() {
87 boolean amIGossipRouter = false;
88 Integer gossipRouterPortDefault = 12001;
89 Integer gossipRouterPort = gossipRouterPortDefault;
90 InetAddress gossipRouterAddress = null;
91 String supernodes_list = System.getProperty("supernodes",
93 StringBuffer sanitized_supernodes_list = new StringBuffer();
94 List<InetAddress> myAddresses = new ArrayList<InetAddress>();
96 StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
97 if (supernodes.hasMoreTokens()) {
98 // Populate the list of my addresses
100 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
101 while (e.hasMoreElements()) {
102 NetworkInterface n = (NetworkInterface) e.nextElement();
103 Enumeration<InetAddress> ee = n.getInetAddresses();
104 while (ee.hasMoreElements()) {
105 InetAddress i = (InetAddress) ee.nextElement();
109 } catch (SocketException se) {
110 logger.error("Cannot get the list of network interfaces");
114 while (supernodes.hasMoreTokens()) {
115 String curr_supernode = supernodes.nextToken();
116 logger.debug("Examining supernode " + curr_supernode);
117 StringTokenizer host_port = new StringTokenizer(curr_supernode,
121 Integer port_num = gossipRouterPortDefault;
122 if (host_port.countTokens() > 2) {
123 logger.error("Error parsing supernode " + curr_supernode
124 + " proceed to the next one");
127 host = host_port.nextToken();
128 InetAddress hostAddr;
130 hostAddr = InetAddress.getByName(host);
131 } catch (UnknownHostException ue) {
132 logger.error("Host not known");
135 if (host_port.hasMoreTokens()) {
136 port = host_port.nextToken();
138 port_num = Integer.valueOf(port);
139 } catch (NumberFormatException ne) {
141 .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
142 port_num = gossipRouterPortDefault;
144 if ((port_num > 65535) || (port_num < 0)) {
146 .error("Supplied supernode gossip port is outside a valid TCP port range");
147 port_num = gossipRouterPortDefault;
150 if (!amIGossipRouter) {
152 for (InetAddress myAddr : myAddresses) {
153 if (myAddr.equals(hostAddr)) {
154 amIGossipRouter = true;
155 gossipRouterAddress = hostAddr;
156 gossipRouterPort = port_num;
162 if (!sanitized_supernodes_list.toString().equals("")) {
163 sanitized_supernodes_list.append(",");
165 sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
169 if (amIGossipRouter) {
170 // Set the Jgroups binding interface to the one we got
171 // from the supernodes attribute
172 if (gossipRouterAddress != null) {
173 System.setProperty("jgroups.tcp.address", gossipRouterAddress
177 // Set the Jgroup binding interface to the one we are well
178 // known outside or else to the first with non-local
181 String myBind = InetAddress.getLocalHost().getHostAddress();
183 || InetAddress.getLocalHost().isLoopbackAddress()) {
184 for (InetAddress myAddr : myAddresses) {
185 if (myAddr.isLoopbackAddress()
186 || myAddr.isLinkLocalAddress()) {
187 logger.debug("Skipping local address "
188 + myAddr.getHostAddress());
191 // First non-local address
192 myBind = myAddr.getHostAddress();
193 logger.debug("First non-local address " + myBind);
198 String jgroupAddress = System
199 .getProperty("jgroups.tcp.address");
200 if (jgroupAddress == null) {
201 if (myBind != null) {
202 logger.debug("Set bind address to be " + myBind);
203 System.setProperty("jgroups.tcp.address", myBind);
206 .debug("Set bind address to be LOCALHOST=127.0.0.1");
207 System.setProperty("jgroups.tcp.address", "127.0.0.1");
210 logger.debug("jgroup.tcp.address already set to be "
213 } catch (UnknownHostException uhe) {
215 .error("Met UnknownHostException while trying to get binding address for jgroups");
219 // The supernodes list constitute also the tcpgossip initial
221 System.setProperty("jgroups.tcpgossip.initial_hosts",
222 sanitized_supernodes_list.toString());
223 logger.debug("jgroups.tcp.address set to "
224 + System.getProperty("jgroups.tcp.address"));
225 logger.debug("jgroups.tcpgossip.initial_hosts set to "
226 + System.getProperty("jgroups.tcpgossip.initial_hosts"));
227 GossipRouter res = null;
228 if (amIGossipRouter) {
229 logger.info("I'm a GossipRouter will listen on port "
231 res = new GossipRouter(gossipRouterPort);
236 public void start() {
237 this.gossiper = startGossiper();
238 if (this.gossiper != null) {
239 logger.debug("Trying to start Gossiper");
241 this.gossiper.start();
242 logger.info("Started GossipRouter");
243 } catch (Exception e) {
244 logger.error("GossipRouter didn't start exception " + e
246 logger.error("Stack Trace that raised the exception");
250 logger.info("Starting the ClusterManager");
252 //FIXME keeps throwing FileNotFoundException
253 this.cm = new DefaultCacheManager("/config/infinispan-config.xml");
254 logger.debug("Allocated ClusterManager");
255 if (this.cm != null) {
257 this.cm.startCache();
258 logger.debug("Started the ClusterManager");
260 } catch (Exception ioe) {
261 logger.error("Cannot configure infinispan .. bailing out ");
262 logger.error("Stack Trace that raised th exception");
263 logger.error("",ioe);
267 logger.debug("Cache Manager has value " + this.cm);
271 logger.info("Stopping the ClusterManager");
272 if (this.cm != null) {
273 logger.info("Found a valid ClusterManager, now let it be stopped");
277 if (this.gossiper != null) {
278 this.gossiper.stop();
279 this.gossiper = null;
284 public ConcurrentMap<?, ?> createCache(String containerName,
285 String cacheName, Set<cacheMode> cMode) throws CacheExistException,
286 CacheConfigException {
287 EmbeddedCacheManager manager = this.cm;
288 Cache<Object,Object> c;
289 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
290 if (manager == null) {
294 if (manager.cacheExists(realCacheName)) {
295 throw new CacheExistException();
298 // Sanity check to avoid contrasting parameters
299 if (cMode.containsAll(EnumSet.of(
300 IClusterServices.cacheMode.NON_TRANSACTIONAL,
301 IClusterServices.cacheMode.TRANSACTIONAL))) {
302 throw new CacheConfigException();
305 if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
306 c = manager.getCache(realCacheName);
308 } else if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
309 Configuration rc = manager
310 .getCacheConfiguration("transactional-type");
311 manager.defineConfiguration(realCacheName, rc);
312 c = manager.getCache(realCacheName);
319 public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
320 EmbeddedCacheManager manager = this.cm;
321 Cache<Object,Object> c;
322 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
323 if (manager == null) {
327 if (manager.cacheExists(realCacheName)) {
328 c = manager.getCache(realCacheName);
335 public void destroyCache(String containerName, String cacheName) {
336 EmbeddedCacheManager manager = this.cm;
337 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
338 if (manager == null) {
341 if (manager.cacheExists(realCacheName)) {
342 manager.removeCache(realCacheName);
347 public boolean existCache(String containerName, String cacheName) {
348 EmbeddedCacheManager manager = this.cm;
349 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
350 if (manager == null) {
353 return manager.cacheExists(realCacheName);
357 public Set<String> getCacheList(String containerName) {
358 Set<String> perContainerCaches = new HashSet<String>();
359 EmbeddedCacheManager manager = this.cm;
360 if (manager == null) {
363 for (String cacheName : manager.getCacheNames()) {
364 if (cacheName.startsWith("{" + containerName + "}_")) {
365 String[] res = cacheName.split("[{}]");
366 if (res.length >= 4 && res[1].equals(containerName)
367 && res[2].equals("_")) {
368 perContainerCaches.add(res[3]);
373 return (perContainerCaches);
377 public Properties getCacheProperties(String containerName, String cacheName) {
378 EmbeddedCacheManager manager = this.cm;
379 if (manager == null) {
382 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
383 if (!manager.cacheExists(realCacheName)) {
386 Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
387 .getCacheConfiguration();
388 Properties p = new Properties();
389 p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
390 conf.transaction().toString());
391 p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
392 conf.clustering().toString());
393 p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
394 .locking().toString());
399 public void addListener(String containerName, String cacheName,
400 IGetUpdates<?, ?> u) throws CacheListenerAddException {
401 EmbeddedCacheManager manager = this.cm;
402 Cache<Object,Object> c;
403 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
404 if (manager == null) {
408 if (!manager.cacheExists(realCacheName)) {
409 throw new CacheListenerAddException();
411 c = manager.getCache(realCacheName);
412 CacheListenerContainer cl = new CacheListenerContainer(u,
413 containerName, cacheName);
418 public Set<IGetUpdates<?, ?>> getListeners(String containerName,
420 EmbeddedCacheManager manager = this.cm;
421 Cache<Object,Object> c;
422 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
423 if (manager == null) {
427 if (!manager.cacheExists(realCacheName)) {
430 c = manager.getCache(realCacheName);
432 Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
433 Set<Object> listeners = c.getListeners();
434 for (Object listener : listeners) {
435 if (listener instanceof CacheListenerContainer) {
436 CacheListenerContainer cl = (CacheListenerContainer) listener;
437 res.add(cl.whichListener());
445 public void removeListener(String containerName, String cacheName,
446 IGetUpdates<?, ?> u) {
447 EmbeddedCacheManager manager = this.cm;
448 Cache<Object,Object> c;
449 String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
450 if (manager == null) {
454 if (!manager.cacheExists(realCacheName)) {
457 c = manager.getCache(realCacheName);
459 Set<Object> listeners = c.getListeners();
460 for (Object listener : listeners) {
461 if (listener instanceof CacheListenerContainer) {
462 CacheListenerContainer cl = (CacheListenerContainer) listener;
463 if (cl.whichListener() == u) {
464 c.removeListener(listener);
472 public void tbegin() throws NotSupportedException, SystemException {
473 EmbeddedCacheManager manager = this.cm;
474 if (manager == null) {
475 throw new IllegalStateException();
477 TransactionManager tm = manager.getCache("transactional-type")
478 .getAdvancedCache().getTransactionManager();
480 throw new IllegalStateException();
486 public void tcommit() throws RollbackException, HeuristicMixedException,
487 HeuristicRollbackException, java.lang.SecurityException,
488 java.lang.IllegalStateException, SystemException {
489 EmbeddedCacheManager manager = this.cm;
490 if (manager == null) {
491 throw new IllegalStateException();
493 TransactionManager tm = manager.getCache("transactional-type")
494 .getAdvancedCache().getTransactionManager();
496 throw new IllegalStateException();
502 public void trollback() throws java.lang.IllegalStateException,
503 java.lang.SecurityException, SystemException {
504 EmbeddedCacheManager manager = this.cm;
505 if (manager == null) {
506 throw new IllegalStateException();
508 TransactionManager tm = manager.getCache("transactional-type")
509 .getAdvancedCache().getTransactionManager();
511 throw new IllegalStateException();
517 public Transaction tgetTransaction() throws SystemException {
518 EmbeddedCacheManager manager = this.cm;
519 if (manager == null) {
520 throw new IllegalStateException();
522 TransactionManager tm = manager.getCache("transactional-type")
523 .getAdvancedCache().getTransactionManager();
527 return tm.getTransaction();
531 public boolean amIStandby() {
532 EmbeddedCacheManager manager = this.cm;
533 if (manager == null) {
534 // In case we cannot fetch the information, lets assume we
535 // are standby, so to have less responsibility.
538 return (!manager.isCoordinator());
541 private InetAddress addressToInetAddress(Address a) {
542 EmbeddedCacheManager manager = this.cm;
543 if ((manager == null) || (a == null)) {
544 // In case we cannot fetch the information, lets assume we
545 // are standby, so to have less responsibility.
548 Transport t = manager.getTransport();
549 if (t instanceof JGroupsTransport) {
550 JGroupsTransport jt = (JGroupsTransport) t;
551 Channel c = jt.getChannel();
552 if (a instanceof JGroupsAddress) {
553 JGroupsAddress ja = (JGroupsAddress) a;
554 org.jgroups.Address phys = (org.jgroups.Address) c
555 .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
556 .getJGroupsAddress()));
557 if (phys instanceof org.jgroups.stack.IpAddress) {
558 InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
567 public List<InetAddress> getClusteredControllers() {
568 EmbeddedCacheManager manager = this.cm;
569 if (manager == null) {
572 List<Address> controllers = manager.getMembers();
573 if ((controllers == null) || controllers.size() == 0)
576 List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
577 for (Address a : controllers) {
578 InetAddress inetAddress = addressToInetAddress(a);
579 if (inetAddress != null
580 && !inetAddress.getHostAddress().equals(loopbackAddress))
581 clusteredControllers.add(inetAddress);
583 return clusteredControllers;
586 public InetAddress getMyAddress() {
587 EmbeddedCacheManager manager = this.cm;
588 if (manager == null) {
591 return addressToInetAddress(manager.getAddress());
595 public InetAddress getActiveAddress() {
596 EmbeddedCacheManager manager = this.cm;
597 if (manager == null) {
598 // In case we cannot fetch the information, lets assume we
599 // are standby, so to have less responsibility.
603 return addressToInetAddress(manager.getCoordinator());
607 public void listenRoleChange(IListenRoleChange i)
608 throws ListenRoleChangeAddException {
609 EmbeddedCacheManager manager = this.cm;
610 if (manager == null) {
611 // In case we cannot fetch the information, lets assume we
612 // are standby, so to have less responsibility.
613 throw new ListenRoleChangeAddException();
616 if (this.roleChangeListeners == null) {
617 this.roleChangeListeners = new HashSet<IListenRoleChange>();
618 this.cacheManagerListener = new ViewChangedListener(
619 this.roleChangeListeners);
620 manager.addListener(this.cacheManagerListener);
623 if (this.roleChangeListeners != null) {
624 this.roleChangeListeners.add(i);
629 public void unlistenRoleChange(IListenRoleChange i) {
630 EmbeddedCacheManager manager = this.cm;
631 if (manager == null) {
632 // In case we cannot fetch the information, lets assume we
633 // are standby, so to have less responsibility.
637 if (this.roleChangeListeners != null) {
638 this.roleChangeListeners.remove(i);
641 if ((this.roleChangeListeners != null && this.roleChangeListeners
643 && (this.cacheManagerListener != null)) {
644 manager.removeListener(this.cacheManagerListener);
645 this.cacheManagerListener = null;
646 this.roleChangeListeners = null;
651 public class ViewChangedListener {
652 Set<IListenRoleChange> roleListeners;
654 public ViewChangedListener(Set<IListenRoleChange> s) {
655 this.roleListeners = s;
659 public void viewChanged(ViewChangedEvent e) {
660 for (IListenRoleChange i : this.roleListeners) {
661 i.newActiveAvailable();