Je travaille en ce moment sur un middleware de coupage téléphonie-informatique fortement multi-thread pour un client parisien: plusieurs centaines voire plusieurs milliers d’appels concurrents 🙂
J’utilise entre autre la sympathique librairie opensource asterisk-java donc la dernière version stable officielle est la 0.3.1
Pour la gestion du pool de threads, la librairie utilise une LinkedBlockingQueue qui est une file à éléments liés avec traitement FIFO, ce qui est bien dans le contexte temps réel (ou pseudo temps réel) des appels: moins le traitement de l’appel attend dans la file, mieux c’est!
Mais, et oui il y a un mais! Par défaut la librairie utilise une file sans limite (unbouded queue) et qui pour moi a un sacré effet de bord niveau création de nouveaux threads donc mauvaise montée en charge au fil des appels entrants.
Je m’explique, ou plutôt je laisse cet expert expliquer dans son post le choix d’implémentation de SUN Microsystems du ThreadPoolExecutor :
http://www.bigsoft.co.uk/blog/index.php/2009/11/27/rules-of-a-threadpoolexecutor-pool-size
Ce qui nous intéresse est ici:
The difference is that the users want to start increasing the pool size earlier and want the queue to be smaller, where as the Sun method want to keep the pool size small and only increase it once the load becomes to much.
Here are Sun’s rules for thread creation in simple terms:
- If the number of threads is less than the
corePoolSize
, create a new Thread to run a new task. - If the number of threads is equal (or greater than) the
corePoolSize
, put the task into the queue. - If the queue is full, and the number of threads is less than the
maxPoolSize
, create a new thread to run tasks in. - If the queue is full, and the number of threads is greater than or equal to
maxPoolSize
, reject the task.
The long and the short of it is that new threads are only created when the queue fills up, so if you’re using an unbounded queue then the number of threads will not exceed corePoolSize
.
For a fuller explanation, get it from the horses mouth: ThreadPoolExecutor API documentation
There is a really good forum post which talks you through the way that the ThreadPoolExecutor
works with code examples:
http://forums.sun.com/thread.jspa?threadID=5401400&tstart=0
More info: http://forums.sun.com/thread.jspa?threadID=5224557&tstart=450
Most people want it the other way around, so that you increase the number of threads to avoid adding to the queue. When the threads are all in use the queue starts to fill up.
Using Sun’s way, I think you are going to end up with a system that runs slower when the load is light and a bit quicker as the load increases. Using the other way means you are running flat out all the time to process outstanding work.
En gros il ne suffit pas d’augmenter le poolSize pour encaisser plus de charge et quand le poolSize est atteint, ThreadPoolExecutor ne crée de nouveaux threads que si la file est pleine: ce qui n’arrivera JAMAIS avec une file illimitée. La version 0.3.1 d’asterisk-java ne créera donc jamais autant de threads que autorisés dans sa configuration, les nouvelles connexions seront rajoutées à la file d’attente.
J’ai donc forkée asteris-java pour en faire une version custom 0.4.0 qui permet, entre autres évolutions pour les besoins du projet, via le fichier de configuration fastagi.properties de:
- choisir entre une file d’attente illimitée ou limitée en taille
- limiter la taille de la file d’attente en spécifiant la valeur que l’on souhaite
Config:
port=4573 poolSize=50 maximumPoolSize=1000 queueSize=100
Bout de code:
public void startup() throws IOException, IllegalStateException { SocketConnectionFacade socket; AgiConnectionHandler connectionHandler; die = false; if(getQueueSize() >0){ pool = new ThreadPoolExecutor(poolSize, (maximumPoolSize < poolSize) ? poolSize : maximumPoolSize, 50000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(getQueueSize()), new DaemonThreadFactory()); logger.info("Bounded thread queue with size: "+getQueueSize()+" and maxPoolSize: "+pool.getMaximumPoolSize()); }else{ pool = new ThreadPoolExecutor(poolSize, (maximumPoolSize < poolSize) ? poolSize : maximumPoolSize, 50000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory()); logger.info("Unbounded thread and maxPoolSize: "+pool.getMaximumPoolSize()); } logger.info("Thread pool started.");
Le fait de rendre cela configurable permet de faire du test et de l’ajustement: car la file ne doit être ni trop grande ni trop petite sinon impacts sur la performance de l’application. Aucune valeur magique n’existe non plus, c’est très lié à la charge moyenne que l’on attend sur le système… donc à tester 🙂
Mystère: Sur une application SVI en PROD avec une capacité de 30 appels concurrents, 10 de poolSize et 32 de maxPoolSize + file d’attente sans limite: curieusement il y a eu une fois 21 threads lancés! (> 10) –> Selon la logique de SUN, si tous les threads sont utilisés a delà de 10, le 11è appel est mis dans la file donc pas de nouveau thread! A SUIVRE