Merge "When a node is going down, remove edges in both directions associated with...
[controller.git] / opendaylight / clustering / services_implementation / src / main / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusterManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.clustering.services_implementation.internal;
11
12 import java.net.InetAddress;
13 import java.net.NetworkInterface;
14 import java.net.SocketException;
15 import java.net.UnknownHostException;
16 import java.util.ArrayList;
17 import java.util.EnumSet;
18 import java.util.Enumeration;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Properties;
22 import java.util.Set;
23 import java.util.StringTokenizer;
24 import java.util.concurrent.ConcurrentMap;
25
26 import javax.transaction.HeuristicMixedException;
27 import javax.transaction.HeuristicRollbackException;
28 import javax.transaction.NotSupportedException;
29 import javax.transaction.RollbackException;
30 import javax.transaction.SystemException;
31 import javax.transaction.Transaction;
32 import javax.transaction.TransactionManager;
33
34 import org.infinispan.Cache;
35 import org.infinispan.configuration.cache.Configuration;
36 import org.infinispan.manager.DefaultCacheManager;
37 import org.infinispan.manager.EmbeddedCacheManager;
38 import org.infinispan.notifications.Listener;
39 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
40 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
41 import org.infinispan.remoting.transport.Address;
42 import org.infinispan.remoting.transport.Transport;
43 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
44 import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
45 import org.jgroups.Channel;
46 import org.jgroups.Event;
47 import org.jgroups.stack.GossipRouter;
48 import org.opendaylight.controller.clustering.services.CacheConfigException;
49 import org.opendaylight.controller.clustering.services.CacheExistException;
50 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
51 import org.opendaylight.controller.clustering.services.IClusterServices;
52 import org.opendaylight.controller.clustering.services.IGetUpdates;
53 import org.opendaylight.controller.clustering.services.IListenRoleChange;
54 import org.opendaylight.controller.clustering.services.ListenRoleChangeAddException;
55 import org.opendaylight.controller.sal.core.IContainerAware;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class ClusterManager implements IClusterServices, IContainerAware {
60     protected static final Logger logger = LoggerFactory
61             .getLogger(ClusterManager.class);
62     private DefaultCacheManager cm;
63     GossipRouter gossiper;
64     private HashSet<IListenRoleChange> roleChangeListeners;
65     private ViewChangedListener cacheManagerListener;
66
67     private static String loopbackAddress = "127.0.0.1";
68
69     /**
70      * Start a JGroups GossipRouter if we are a supernode. The
71      * GosispRouter is nothing more than a simple
72      * rendevouz-pointer. All the nodes that wants to join the cluster
73      * will come to any of the rendevouz point and they introduce the
74      * nodes to all the others. Once the meet and greet phase if over,
75      * the nodes will open a full-mesh with the remaining n-1 nodes,
76      * so even if the GossipRouter goes down nothing is lost.
77      * NOTE: This function has the side effect to set some of the
78      * JGROUPS configurations, this because in this function already
79      * we try to retrieve some of the network capabilities of the
80      * device and so it's better not to do that again
81      *
82      *
83      * @return GossipRouter
84      */
85     private GossipRouter startGossiper() {
86         boolean amIGossipRouter = false;
87         Integer gossipRouterPortDefault = 12001;
88         Integer gossipRouterPort = gossipRouterPortDefault;
89         InetAddress gossipRouterAddress = null;
90         String supernodes_list = System.getProperty("supernodes",
91                 loopbackAddress);
92         StringBuffer sanitized_supernodes_list = new StringBuffer();
93         List<InetAddress> myAddresses = new ArrayList<InetAddress>();
94
95         StringTokenizer supernodes = new StringTokenizer(supernodes_list, ":");
96         if (supernodes.hasMoreTokens()) {
97             // Populate the list of my addresses
98             try {
99                 Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
100                 while (e.hasMoreElements()) {
101                     NetworkInterface n = e.nextElement();
102                     Enumeration<InetAddress> ee = n.getInetAddresses();
103                     while (ee.hasMoreElements()) {
104                         InetAddress i = ee.nextElement();
105                         myAddresses.add(i);
106                     }
107                 }
108             } catch (SocketException se) {
109                 logger.error("Cannot get the list of network interfaces");
110                 return null;
111             }
112         }
113         while (supernodes.hasMoreTokens()) {
114             String curr_supernode = supernodes.nextToken();
115             logger.debug("Examining supernode {}", curr_supernode);
116             StringTokenizer host_port = new StringTokenizer(curr_supernode,
117                     "[]");
118             String host;
119             String port;
120             Integer port_num = gossipRouterPortDefault;
121             if (host_port.countTokens() > 2) {
122                 logger.error("Error parsing supernode {} proceed to the next one",
123                         curr_supernode);
124                 continue;
125             }
126             host = host_port.nextToken();
127             InetAddress hostAddr;
128             try {
129                 hostAddr = InetAddress.getByName(host);
130             } catch (UnknownHostException ue) {
131                 logger.error("Host not known");
132                 continue;
133             }
134             if (host_port.hasMoreTokens()) {
135                 port = host_port.nextToken();
136                 try {
137                     port_num = Integer.valueOf(port);
138                 } catch (NumberFormatException ne) {
139                     logger
140                             .error("Supplied supernode gossiepr port is not recognized, using standard gossipport");
141                     port_num = gossipRouterPortDefault;
142                 }
143                 if ((port_num > 65535) || (port_num < 0)) {
144                     logger
145                             .error("Supplied supernode gossip port is outside a valid TCP port range");
146                     port_num = gossipRouterPortDefault;
147                 }
148             }
149             if (!amIGossipRouter) {
150                 if (host != null) {
151                     for (InetAddress myAddr : myAddresses) {
152                         if (myAddr.equals(hostAddr)) {
153                             amIGossipRouter = true;
154                             gossipRouterAddress = hostAddr;
155                             gossipRouterPort = port_num;
156                             break;
157                         }
158                     }
159                 }
160             }
161             if (!sanitized_supernodes_list.toString().equals("")) {
162                 sanitized_supernodes_list.append(",");
163             }
164             sanitized_supernodes_list.append(hostAddr.getHostAddress() + "["
165                     + port_num + "]");
166         }
167
168         if (amIGossipRouter) {
169             // Set the Jgroups binding interface to the one we got
170             // from the supernodes attribute
171             if (gossipRouterAddress != null) {
172                 System.setProperty("jgroups.tcp.address", gossipRouterAddress
173                         .getHostAddress());
174             }
175         } else {
176             // Set the Jgroup binding interface to the one we are well
177             // known outside or else to the first with non-local
178             // scope.
179             try {
180                 String myBind = InetAddress.getLocalHost().getHostAddress();
181                 if (myBind == null
182                         || InetAddress.getLocalHost().isLoopbackAddress()) {
183                     for (InetAddress myAddr : myAddresses) {
184                         if (myAddr.isLoopbackAddress()
185                                 || myAddr.isLinkLocalAddress()) {
186                             logger.debug("Skipping local address {}",
187                                          myAddr.getHostAddress());
188                             continue;
189                         } else {
190                             // First non-local address
191                             myBind = myAddr.getHostAddress();
192                             logger.debug("First non-local address {}", myBind);
193                             break;
194                         }
195                     }
196                 }
197                 String jgroupAddress = System
198                         .getProperty("jgroups.tcp.address");
199                 if (jgroupAddress == null) {
200                     if (myBind != null) {
201                         logger.debug("Set bind address to be {}", myBind);
202                         System.setProperty("jgroups.tcp.address", myBind);
203                     } else {
204                         logger
205                                 .debug("Set bind address to be LOCALHOST=127.0.0.1");
206                         System.setProperty("jgroups.tcp.address", "127.0.0.1");
207                     }
208                 } else {
209                     logger.debug("jgroup.tcp.address already set to be {}",
210                             jgroupAddress);
211                 }
212             } catch (UnknownHostException uhe) {
213                 logger
214                         .error("Met UnknownHostException while trying to get binding address for jgroups");
215             }
216         }
217
218         // The supernodes list constitute also the tcpgossip initial
219         // host list
220         System.setProperty("jgroups.tcpgossip.initial_hosts",
221                 sanitized_supernodes_list.toString());
222         logger.debug("jgroups.tcp.address set to {}",
223                 System.getProperty("jgroups.tcp.address"));
224         logger.debug("jgroups.tcpgossip.initial_hosts set to {}",
225                 System.getProperty("jgroups.tcpgossip.initial_hosts"));
226         GossipRouter res = null;
227         if (amIGossipRouter) {
228             logger.info("I'm a GossipRouter will listen on port {}",
229                     gossipRouterPort);
230             res = new GossipRouter(gossipRouterPort);
231         }
232         return res;
233     }
234
235     public void start() {
236         this.gossiper = startGossiper();
237         if (this.gossiper != null) {
238             logger.debug("Trying to start Gossiper");
239             try {
240                 this.gossiper.start();
241                 logger.info("Started GossipRouter");
242             } catch (Exception e) {
243                 logger.error("GossipRouter didn't start. Exception Stack Trace",
244                              e);
245             }
246         }
247         logger.info("Starting the ClusterManager");
248         try {
249             //FIXME keeps throwing FileNotFoundException
250             this.cm = new DefaultCacheManager("config/infinispan-config.xml");
251             logger.debug("Allocated ClusterManager");
252             if (this.cm != null) {
253                 this.cm.start();
254                 this.cm.startCache();
255                 logger.debug("Started the ClusterManager");
256             }
257         } catch (Exception ioe) {
258             logger.error("Cannot configure infinispan .. bailing out ");
259             logger.error("Stack Trace that raised th exception");
260             logger.error("",ioe);
261             this.cm = null;
262             this.stop();
263         }
264         logger.debug("Cache Manager has value {}", this.cm);
265     }
266
267     public void stop() {
268         logger.info("Stopping the ClusterManager");
269         if (this.cm != null) {
270             logger.info("Found a valid ClusterManager, now let it be stopped");
271             this.cm.stop();
272             this.cm = null;
273         }
274         if (this.gossiper != null) {
275             this.gossiper.stop();
276             this.gossiper = null;
277         }
278     }
279
280     @Override
281     public ConcurrentMap<?, ?> createCache(String containerName,
282             String cacheName, Set<cacheMode> cMode) throws CacheExistException,
283             CacheConfigException {
284         EmbeddedCacheManager manager = this.cm;
285         Cache<Object,Object> c;
286         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
287         if (manager == null) {
288             return null;
289         }
290
291         if (manager.cacheExists(realCacheName)) {
292             throw new CacheExistException();
293         }
294
295         // Sanity check to avoid contrasting parameters
296         if (cMode.containsAll(EnumSet.of(
297                 IClusterServices.cacheMode.NON_TRANSACTIONAL,
298                 IClusterServices.cacheMode.TRANSACTIONAL))) {
299             throw new CacheConfigException();
300         }
301
302         if (cMode.contains(IClusterServices.cacheMode.NON_TRANSACTIONAL)) {
303             c = manager.getCache(realCacheName);
304             return c;
305         } else if (cMode.contains(IClusterServices.cacheMode.TRANSACTIONAL)) {
306             Configuration rc = manager
307                     .getCacheConfiguration("transactional-type");
308             manager.defineConfiguration(realCacheName, rc);
309             c = manager.getCache(realCacheName);
310             return c;
311         }
312         return null;
313     }
314
315     @Override
316     public ConcurrentMap<?, ?> getCache(String containerName, String cacheName) {
317         EmbeddedCacheManager manager = this.cm;
318         Cache<Object,Object> c;
319         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
320         if (manager == null) {
321             return null;
322         }
323
324         if (manager.cacheExists(realCacheName)) {
325             c = manager.getCache(realCacheName);
326             return c;
327         }
328         return null;
329     }
330
331     @Override
332     public void destroyCache(String containerName, String cacheName) {
333         EmbeddedCacheManager manager = this.cm;
334         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
335         if (manager == null) {
336             return;
337         }
338         if (manager.cacheExists(realCacheName)) {
339             manager.removeCache(realCacheName);
340         }
341     }
342
343     @Override
344     public boolean existCache(String containerName, String cacheName) {
345         EmbeddedCacheManager manager = this.cm;
346         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
347         if (manager == null) {
348             return false;
349         }
350         return manager.cacheExists(realCacheName);
351     }
352
353     @Override
354     public Set<String> getCacheList(String containerName) {
355         Set<String> perContainerCaches = new HashSet<String>();
356         EmbeddedCacheManager manager = this.cm;
357         if (manager == null) {
358             return null;
359         }
360         for (String cacheName : manager.getCacheNames()) {
361             if (cacheName.startsWith("{" + containerName + "}_")) {
362                 String[] res = cacheName.split("[{}]");
363                 if (res.length >= 4 && res[1].equals(containerName)
364                         && res[2].equals("_")) {
365                     perContainerCaches.add(res[3]);
366                 }
367             }
368         }
369
370         return (perContainerCaches);
371     }
372
373     @Override
374     public Properties getCacheProperties(String containerName, String cacheName) {
375         EmbeddedCacheManager manager = this.cm;
376         if (manager == null) {
377             return null;
378         }
379         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
380         if (!manager.cacheExists(realCacheName)) {
381             return null;
382         }
383         Configuration conf = manager.getCache(realCacheName).getAdvancedCache()
384                 .getCacheConfiguration();
385         Properties p = new Properties();
386         p.setProperty(IClusterServices.cacheProps.TRANSACTION_PROP.toString(),
387                 conf.transaction().toString());
388         p.setProperty(IClusterServices.cacheProps.CLUSTERING_PROP.toString(),
389                 conf.clustering().toString());
390         p.setProperty(IClusterServices.cacheProps.LOCKING_PROP.toString(), conf
391                 .locking().toString());
392         return p;
393     }
394
395     @Override
396     public void addListener(String containerName, String cacheName,
397             IGetUpdates<?, ?> u) throws CacheListenerAddException {
398         EmbeddedCacheManager manager = this.cm;
399         Cache<Object,Object> c;
400         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
401         if (manager == null) {
402             return;
403         }
404
405         if (!manager.cacheExists(realCacheName)) {
406             throw new CacheListenerAddException();
407         }
408         c = manager.getCache(realCacheName);
409         CacheListenerContainer cl = new CacheListenerContainer(u,
410                 containerName, cacheName);
411         c.addListener(cl);
412     }
413
414     @Override
415     public Set<IGetUpdates<?, ?>> getListeners(String containerName,
416             String cacheName) {
417         EmbeddedCacheManager manager = this.cm;
418         Cache<Object,Object> c;
419         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
420         if (manager == null) {
421             return null;
422         }
423
424         if (!manager.cacheExists(realCacheName)) {
425             return null;
426         }
427         c = manager.getCache(realCacheName);
428
429         Set<IGetUpdates<?, ?>> res = new HashSet<IGetUpdates<?, ?>>();
430         Set<Object> listeners = c.getListeners();
431         for (Object listener : listeners) {
432             if (listener instanceof CacheListenerContainer) {
433                 CacheListenerContainer cl = (CacheListenerContainer) listener;
434                 res.add(cl.whichListener());
435             }
436         }
437
438         return res;
439     }
440
441     @Override
442     public void removeListener(String containerName, String cacheName,
443             IGetUpdates<?, ?> u) {
444         EmbeddedCacheManager manager = this.cm;
445         Cache<Object,Object> c;
446         String realCacheName = "{" + containerName + "}_{" + cacheName + "}";
447         if (manager == null) {
448             return;
449         }
450
451         if (!manager.cacheExists(realCacheName)) {
452             return;
453         }
454         c = manager.getCache(realCacheName);
455
456         Set<Object> listeners = c.getListeners();
457         for (Object listener : listeners) {
458             if (listener instanceof CacheListenerContainer) {
459                 CacheListenerContainer cl = (CacheListenerContainer) listener;
460                 if (cl.whichListener() == u) {
461                     c.removeListener(listener);
462                     return;
463                 }
464             }
465         }
466     }
467
468     @Override
469     public void tbegin() throws NotSupportedException, SystemException {
470         EmbeddedCacheManager manager = this.cm;
471         if (manager == null) {
472             throw new IllegalStateException();
473         }
474         TransactionManager tm = manager.getCache("transactional-type")
475                 .getAdvancedCache().getTransactionManager();
476         if (tm == null) {
477             throw new IllegalStateException();
478         }
479         tm.begin();
480     }
481
482     @Override
483     public void tcommit() throws RollbackException, HeuristicMixedException,
484             HeuristicRollbackException, java.lang.SecurityException,
485             java.lang.IllegalStateException, SystemException {
486         EmbeddedCacheManager manager = this.cm;
487         if (manager == null) {
488             throw new IllegalStateException();
489         }
490         TransactionManager tm = manager.getCache("transactional-type")
491                 .getAdvancedCache().getTransactionManager();
492         if (tm == null) {
493             throw new IllegalStateException();
494         }
495         tm.commit();
496     }
497
498     @Override
499     public void trollback() throws java.lang.IllegalStateException,
500             java.lang.SecurityException, SystemException {
501         EmbeddedCacheManager manager = this.cm;
502         if (manager == null) {
503             throw new IllegalStateException();
504         }
505         TransactionManager tm = manager.getCache("transactional-type")
506                 .getAdvancedCache().getTransactionManager();
507         if (tm == null) {
508             throw new IllegalStateException();
509         }
510         tm.rollback();
511     }
512
513     @Override
514     public Transaction tgetTransaction() throws SystemException {
515         EmbeddedCacheManager manager = this.cm;
516         if (manager == null) {
517             throw new IllegalStateException();
518         }
519         TransactionManager tm = manager.getCache("transactional-type")
520                 .getAdvancedCache().getTransactionManager();
521         if (tm == null) {
522             return null;
523         }
524         return tm.getTransaction();
525     }
526
527     @Override
528     public boolean amIStandby() {
529         EmbeddedCacheManager manager = this.cm;
530         if (manager == null) {
531             // In case we cannot fetch the information, lets assume we
532             // are standby, so to have less responsibility.
533             return true;
534         }
535         return (!manager.isCoordinator());
536     }
537
538     private InetAddress addressToInetAddress(Address a) {
539         EmbeddedCacheManager manager = this.cm;
540         if ((manager == null) || (a == null)) {
541             // In case we cannot fetch the information, lets assume we
542             // are standby, so to have less responsibility.
543             return null;
544         }
545         Transport t = manager.getTransport();
546         if (t instanceof JGroupsTransport) {
547             JGroupsTransport jt = (JGroupsTransport) t;
548             Channel c = jt.getChannel();
549             if (a instanceof JGroupsAddress) {
550                 JGroupsAddress ja = (JGroupsAddress) a;
551                 org.jgroups.Address phys = (org.jgroups.Address) c
552                         .down(new Event(Event.GET_PHYSICAL_ADDRESS, ja
553                                 .getJGroupsAddress()));
554                 if (phys instanceof org.jgroups.stack.IpAddress) {
555                     InetAddress bindAddress = ((org.jgroups.stack.IpAddress) phys)
556                             .getIpAddress();
557                     return bindAddress;
558                 }
559             }
560         }
561         return null;
562     }
563
564     @Override
565     public List<InetAddress> getClusteredControllers() {
566         EmbeddedCacheManager manager = this.cm;
567         if (manager == null) {
568             return null;
569         }
570         List<Address> controllers = manager.getMembers();
571         if ((controllers == null) || controllers.size() == 0) {
572             return null;
573         }
574
575         List<InetAddress> clusteredControllers = new ArrayList<InetAddress>();
576         for (Address a : controllers) {
577             InetAddress inetAddress = addressToInetAddress(a);
578             if (inetAddress != null
579                     && !inetAddress.getHostAddress().equals(loopbackAddress)) {
580                 clusteredControllers.add(inetAddress);
581             }
582         }
583         return clusteredControllers;
584     }
585
586     @Override
587     public InetAddress getMyAddress() {
588         EmbeddedCacheManager manager = this.cm;
589         if (manager == null) {
590             return null;
591         }
592         return addressToInetAddress(manager.getAddress());
593     }
594
595     @Override
596     public InetAddress getActiveAddress() {
597         EmbeddedCacheManager manager = this.cm;
598         if (manager == null) {
599             // In case we cannot fetch the information, lets assume we
600             // are standby, so to have less responsibility.
601             return null;
602         }
603
604         return addressToInetAddress(manager.getCoordinator());
605     }
606
607     @Override
608     public void listenRoleChange(IListenRoleChange i)
609             throws ListenRoleChangeAddException {
610         EmbeddedCacheManager manager = this.cm;
611         if (manager == null) {
612             // In case we cannot fetch the information, lets assume we
613             // are standby, so to have less responsibility.
614             throw new ListenRoleChangeAddException();
615         }
616
617         if (this.roleChangeListeners == null) {
618             this.roleChangeListeners = new HashSet<IListenRoleChange>();
619             this.cacheManagerListener = new ViewChangedListener(
620                     this.roleChangeListeners);
621             manager.addListener(this.cacheManagerListener);
622         }
623
624         if (this.roleChangeListeners != null) {
625             this.roleChangeListeners.add(i);
626         }
627     }
628
629     @Override
630     public void unlistenRoleChange(IListenRoleChange i) {
631         EmbeddedCacheManager manager = this.cm;
632         if (manager == null) {
633             // In case we cannot fetch the information, lets assume we
634             // are standby, so to have less responsibility.
635             return;
636         }
637
638         if (this.roleChangeListeners != null) {
639             this.roleChangeListeners.remove(i);
640         }
641
642         if ((this.roleChangeListeners != null && this.roleChangeListeners
643                 .isEmpty())
644                 && (this.cacheManagerListener != null)) {
645             manager.removeListener(this.cacheManagerListener);
646             this.cacheManagerListener = null;
647             this.roleChangeListeners = null;
648         }
649     }
650
651     @Listener
652     public class ViewChangedListener {
653         Set<IListenRoleChange> roleListeners;
654
655         public ViewChangedListener(Set<IListenRoleChange> s) {
656             this.roleListeners = s;
657         }
658
659         @ViewChanged
660         public void viewChanged(ViewChangedEvent e) {
661             for (IListenRoleChange i : this.roleListeners) {
662                 i.newActiveAvailable();
663             }
664         }
665     }
666
667     private void removeContainerCaches(String containerName) {
668         logger.info("Destroying caches for container {}", containerName);
669         for (String cacheName : this.getCacheList(containerName)) {
670             this.destroyCache(containerName, cacheName);
671         }
672     }
673
674     @Override
675     public void containerCreate(String arg0) {
676         // no op
677     }
678
679     @Override
680     public void containerDestroy(String container) {
681         removeContainerCaches(container);
682     }
683 }