27-05-2013, 03:44 PM
Bonjour,
Dans le cadre de mon système de client Rss j'ai un crontab qui tourne pour traiter les feeds et les nouveaux éléments, j'ai décidé vous faire un petit retour d'expérience à ce sujet.
J'ai décidé de m'affranchir des crontabs et de tout traiter ça en interne, pour plus de "souplesse".
Alors voici le code nécessaire sous Play :
Le code est plutôt simple, au démarrage de l'application on appel syncFeed qui va utiliser Akka (qui est en gros la nouvelle bibliothèque standard pour les acteurs sur Scala).
Akka permet de créer des scheduler, ici je précise qu'il démarre immédiatement (0.seconds) et qu'il se répète toutes les 1.minutes et qu'il appelle à chaque fois controllers.Process.all.
J'ai cependant eu un soucis avec ce système, toutes les minutes il appelle Process.all que le traitement soit terminé ou non, ce qui fait que si le traitement prend plus de une minute, ça peut vite devenir la pagaille, les process s'entassent, le système devient de plus en plus lent, s'embourbe et fini par planter.
Il faut savoir que le scheduler de Akka utilise une mailbox, c'est une boite qui liste tous les événements à traiter, j'ai donc configuré Akka pour que la mailbox ne puisse avoir plus de 1 item :
Ainsi, mon soucis est réglé
Autre chose, mon traitement de feed était synchrone :
Donc si un feed prenait beaucoup de temps à être traité, ça ralentissait tout le système.
J'ai donc décider de paralléliser tout ça.
Et Scala a une super solution pour ça, les collections parallèle.
Il peut très facilement paralléliser divers type de collections.
Ce système ne marche pas avec une boucle for, mais marche par exemple avec la structure map. Qui est très semblable à un for, mon code précédent peut être écrit de cette façon avec un map :
Pour qu'il soit parallelisé il suffit d'écrire :
J'ai juste rajouté la méthode par.
Mais... Je me suis rendu compte que ce code plantait aléatoirement au bout d'une heure environ, un plantage du type blocage sans aucun message d'erreur.
Il s'avère en fait que c'est parce que j'utilise des variables dans mon map et donc mes threads sont pas safe, puisque plusieurs peuvent écrire/lire nbFeed ou newArticle en même temps. Et par moment ça provoquait un blocage.
J'ai donc réécrit mon code pour qu'il soit thread safe :
Le début du code est semblable au précédent :
Fold ici, initialise la variable sum à 0, parcourt tous les éléments de la collection et va ajouter à sum chaque value.
Mais en sugar syntax je l'ai écrit comme ça :
Voilà un petit retour d'expérience sur une problématique que j'ai vécu
Dans le cadre de mon système de client Rss j'ai un crontab qui tourne pour traiter les feeds et les nouveaux éléments, j'ai décidé vous faire un petit retour d'expérience à ce sujet.
J'ai décidé de m'affranchir des crontabs et de tout traiter ça en interne, pour plus de "souplesse".
Alors voici le code nécessaire sous Play :
object Global extends GlobalSettings {
override def onStart(app: Application) {
syncFeed
}
def syncFeed {
Akka.system.scheduler.schedule(0.seconds, 1.minutes) {
controllers.Process.all
}
}
}
Le code est plutôt simple, au démarrage de l'application on appel syncFeed qui va utiliser Akka (qui est en gros la nouvelle bibliothèque standard pour les acteurs sur Scala).
Akka permet de créer des scheduler, ici je précise qu'il démarre immédiatement (0.seconds) et qu'il se répète toutes les 1.minutes et qu'il appelle à chaque fois controllers.Process.all.
J'ai cependant eu un soucis avec ce système, toutes les minutes il appelle Process.all que le traitement soit terminé ou non, ce qui fait que si le traitement prend plus de une minute, ça peut vite devenir la pagaille, les process s'entassent, le système devient de plus en plus lent, s'embourbe et fini par planter.
Il faut savoir que le scheduler de Akka utilise une mailbox, c'est une boite qui liste tous les événements à traiter, j'ai donc configuré Akka pour que la mailbox ne puisse avoir plus de 1 item :
play {
akka {
actor {
mailbox {
max-items = 1
}
}
}
}
Ainsi, mon soucis est réglé
Autre chose, mon traitement de feed était synchrone :
for (feed <- feeds) {
nbFeed = nbFeed+1
newArticle += process(feed)
}
Donc si un feed prenait beaucoup de temps à être traité, ça ralentissait tout le système.
J'ai donc décider de paralléliser tout ça.
Et Scala a une super solution pour ça, les collections parallèle.
Il peut très facilement paralléliser divers type de collections.
Ce système ne marche pas avec une boucle for, mais marche par exemple avec la structure map. Qui est très semblable à un for, mon code précédent peut être écrit de cette façon avec un map :
feeds.map(feed =>
nbFeed = nbFeed+1
newArticle += process(feed)
)
Pour qu'il soit parallelisé il suffit d'écrire :
feeds.par.map(feed =>
nbFeed = nbFeed+1
newArticle += process(feed)
)
J'ai juste rajouté la méthode par.
Mais... Je me suis rendu compte que ce code plantait aléatoirement au bout d'une heure environ, un plantage du type blocage sans aucun message d'erreur.
Il s'avère en fait que c'est parce que j'utilise des variables dans mon map et donc mes threads sont pas safe, puisque plusieurs peuvent écrire/lire nbFeed ou newArticle en même temps. Et par moment ça provoquait un blocage.
J'ai donc réécrit mon code pour qu'il soit thread safe :
val nbFeed = feeds.length
val newArticle = feeds.par.map(feed =>
process(feed)
).fold(0)((sum, value) =>
sum+value
)
Le début du code est semblable au précédent :
feeds.par.map(feed => process(feed))
Mais je devais compter le nombre de newArticle et j'ai donc utilisé la méthode fold pour ça.Fold ici, initialise la variable sum à 0, parcourt tous les éléments de la collection et va ajouter à sum chaque value.
Mais en sugar syntax je l'ai écrit comme ça :
val newArticle = feeds.par.map(process(_)).fold(0)(_+_)
Voilà un petit retour d'expérience sur une problématique que j'ai vécu