Asterisk-Java, Multi-threading, Pool et Queue: mon analyse et mon hook !

Je travaille en ce moment sur un middleware de coupage téléphonie-informatique fortement multi-thread pour un client parisien: plusieurs centaines voire plusieurs milliers d’appels concurrents 🙂

J’utilise entre autre la sympathique librairie opensource asterisk-java donc la dernière version stable officielle est la 0.3.1

Pour la gestion du pool de threads, la librairie utilise une LinkedBlockingQueue qui est une file à éléments liés avec traitement FIFO, ce qui est bien dans le contexte temps réel (ou pseudo temps réel) des appels: moins le traitement de l’appel attend dans la file, mieux c’est!

Mais, et oui il y a un mais! Par défaut la librairie utilise une file sans limite (unbouded queue) et qui pour moi a un sacré effet de bord niveau création de nouveaux threads donc mauvaise montée en charge au fil des appels entrants.

Je m’explique, ou plutôt je laisse cet expert expliquer dans son post le choix d’implémentation de SUN Microsystems du ThreadPoolExecutor :

http://www.bigsoft.co.uk/blog/index.php/2009/11/27/rules-of-a-threadpoolexecutor-pool-size

Ce qui nous intéresse est ici:

The difference is that the users want to start increasing the pool size earlier and want the queue to be smaller, where as the Sun method want to keep the pool size small and only increase it once the load becomes to much.

Here are Sun’s rules for thread creation in simple terms:

  1. If the number of threads is less than the corePoolSize, create a new Thread to run a new task.
  2. If the number of threads is equal (or greater than) the corePoolSize, put the task into the queue.
  3. If the queue is full, and the number of threads is less than the maxPoolSize, create a new thread to run tasks in.
  4. If the queue is full, and the number of threads is greater than or equal to maxPoolSize, reject the task.

The long and the short of it is that new threads are only created when the queue fills up, so if you’re using an unbounded queue then the number of threads will not exceed corePoolSize.

For a fuller explanation, get it from the horses mouth: ThreadPoolExecutor API documentation

There is a really good forum post which talks you through the way that the ThreadPoolExecutor works with code examples:
http://forums.sun.com/thread.jspa?threadID=5401400&tstart=0

More info: http://forums.sun.com/thread.jspa?threadID=5224557&tstart=450

Most people want it the other way around, so that you increase the number of threads to avoid adding to the queue. When the threads are all in use the queue starts to fill up.

Using Sun’s way, I think you are going to end up with a system that runs slower when the load is light and a bit quicker as the load increases. Using the other way means you are running flat out all the time to process outstanding work.

En gros il ne suffit pas d’augmenter le poolSize pour encaisser plus de charge et quand le poolSize est atteint, ThreadPoolExecutor ne crée de nouveaux threads que si la file est pleine: ce qui n’arrivera JAMAIS avec une file illimitée. La version 0.3.1 d’asterisk-java ne créera donc jamais autant de threads que autorisés dans sa configuration, les nouvelles connexions seront rajoutées à la file d’attente.

J’ai donc forkée asteris-java pour en faire une version custom 0.4.0 qui permet, entre autres évolutions pour les besoins du projet, via le fichier de configuration fastagi.properties de:

  • choisir entre une file d’attente illimitée ou limitée en taille
  • limiter la taille de la file d’attente en spécifiant la valeur que l’on souhaite

Config:

port=4573
poolSize=50
maximumPoolSize=1000
queueSize=100

Bout de code:

 public void startup() throws IOException, IllegalStateException
    {
        SocketConnectionFacade socket;
        AgiConnectionHandler connectionHandler;

        die = false;

        if(getQueueSize() >0){
             pool = new ThreadPoolExecutor(poolSize, (maximumPoolSize < poolSize) ? poolSize : maximumPoolSize, 50000L,
                     TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(getQueueSize()), new DaemonThreadFactory());
             logger.info("Bounded thread queue with size: "+getQueueSize()+" and maxPoolSize: "+pool.getMaximumPoolSize());

        }else{
            pool = new ThreadPoolExecutor(poolSize, (maximumPoolSize < poolSize) ? poolSize : maximumPoolSize, 50000L,
                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory());    
            logger.info("Unbounded thread and maxPoolSize: "+pool.getMaximumPoolSize());
        }

        logger.info("Thread pool started.");

Le fait de rendre cela configurable permet de faire du test et de l’ajustement: car la file ne doit être ni trop grande ni trop petite sinon impacts sur la performance de l’application. Aucune valeur magique n’existe non plus, c’est très lié à la charge moyenne que l’on attend sur le système… donc à tester 🙂

Mystère: Sur une application SVI en PROD avec une capacité de 30 appels concurrents, 10 de poolSize et 32 de maxPoolSize + file d’attente sans limite: curieusement il y a eu une fois 21 threads lancés! (> 10) –> Selon la logique de SUN, si tous les threads sont utilisés a delà de 10, le 11è appel est mis dans la file donc pas de nouveau thread! A SUIVRE

Faites de l’Asterisk avec du Java: Couplage Asterisk et Web Service REST Java

J’ai une affection particulière pour les projets liant ces 2 mondes:

  • télécom
  • informatique

J’utiliserai le terme TelcoWeb, découvert via un de mes 2 responsables de stages de fin d’études, il y a quelques années déjà (le temps passe vite :-)) pour désigner ce genre de projet à cheval sur ces univers.

Asterisk http://fr.wikipedia.org/wiki/Asterisk_%28logiciel%29

  • Serveur de téléphonie, Messagerie vocale, conférence téléphonique
  • Serveur Vocal Interactif
  • Centre d’appels
  • Passerelle VoIP vers d’autres types de réseaux…

Java: Si vous ne savez pas ce que sait que ce truc, arrêtez de lire cet article et allez vous prendre un café avant d’avoir mal à la tête 🙂

Lire la suite