Server side Faye’s client ou le protocole Bayeux
Dans mon précédent article, je vous ai montré comment mettre en place un Comet avec Capcode. J'ai découvert que certains d'entre vous lisent ce que j'écris1 par le biais de quelques mails dans ma boite aux lettres. L'un d'entre vous m'a même demandé comment faire pour envoyer des évènements sur le bus Comet sans passer par l'application Web. Bonne question, sachant qu'en effet, faye ne propose aucune solution pour cela. Et bien je me suis amusé à en développer une...
Faye
Pour comprendre ce que nous allons faire, il faut prendre le temps de regarder comment fonctionne faye. Mais avant, nous devons choisir la solution à adopter pour écrire notre librairie. En effet, nous avons deux choix possibles : soit nous pouvons parler directement au serveur faye, soit nous pouvons simuler le comportement d'un client. En fonction de ce choix, nous regarderons telle ou telle partie du code de faye. Bon et puisque c'est moi qui choisis2, j'opte pour la seconde solution.
Avant de décortiquer le code, rappelons le cheminement par l'utilisateur lors de l'utilisation de soapbox. Avant tout nous démarrons l'application3. Nous donnons notre username. Ensuite nous indiquons qui nous voulons suivre. Enfin nous racontons notre vie. Ces deux dernières étapes pouvant se faire dans n'importe quel ordre, et autant de fois que nous voulons.
Regardons maintenant le code. Lors de l'accès à l'application (route /) nous recevons la page mise en place par la vue views/index.rhtml. Dans cette vue, ce qui nous intéresse ce sont les lignes suivantes :
1 <script type="text/javascript">
2 Comet = new Faye.Client('/comet');
3 Comet.connect();
4
5 Soapbox.init(Comet);
6 </script>
Nous créons un client Comet (ligne 41) pour le bus accessible via la route /comet puis nous établissons la connexion (ligne 42). Ensuite nous initialisons l'application (ligne 44). Si nous regardons à quoi correspondent la création et la connexion du client Comet, il faut se plonger dans le code du script comet.js. Ce fichier est généré lors de la création du gem faye à partir de l'ensemble des fichiers contenus dans le répertoire client des sources. Le fichier qui nous intéresse principalement ici est client/client.js. Dans ce fichier vous trouverez le code correspondant à la connexion entre les lignes 75 et 113.
Si nous regardons maintenant le fichier soapbox.js nous pouvons étudier ce qui se passe lors de l'initialisation de l'application. Nous voyons dans ce code que soapbox attend que l'utilisateur saisisse son username. Une fois ceci fait, l'application souscrit au channel /mentioning/<username>, elle masque la zone de saisie du username et affiche les zones de saisie des personnes à suivre et des messages. Elle met ensuite en place les actions correspondantes pour ces deux zones de saisie. La première de ces actions se traduit par la souscription au channel /from/<follow> avec comme callback : accept. La seconde revoie vers la méthode post dont le rôle principal est de publier le message sur le channel /from/<username>.
Outch !
Ce qui est important ici c'est de voir ce dont nous avons besoin pour faire cela. Nous en retiendrons donc la nécessité de développer une méthode de connexion, une méthode de souscription et une méthode de publication. Pour comprendre comment ces méthodes agissent, il suffit de regarder ce qui est fait dans les méthodes correspondantes de Faye.Client.
Je ne vais pas pouvoir, au risque de perdre plus de monde que ceux qui ont déjà lâché prise avant la fin du dernier paragraphe, entrer plus dans les détails. Il va donc falloir me croire sur parole ;) En fait, ce que nous allons faire c'est mettre en place une version cliente du protocole des échanges Comet appelé protocole Bayeux. Je vous engage donc à en lire la documentation si vous souhaitez approfondir le sujet.
Messages
Les échanges entre le client Comet et le bus peuvent se faire de deux façons, soit en long-polling soit en callback-polling. Dans notre cas, nous utilisons la première solution en faisant un envoie en POST du paramètre message, ce dernier ayant pour valeur une chaine JSON.
Mettons cela en place :
module Faye
class Client
def initialize( uri_or_string )
@uri = uri_or_string
@uri = URI.parse(@uri) if @uri.class == String
# ...
end
# ...
private
def send( message )
res = Net::HTTP.post_form( @uri, { "message" => message.to_json } )
return JSON.parse( res.body )
end
end
end
Je place cette méthode en private car elle n'a d'intérêt que pour les méthodes que nous allons écrire ensuite.
Comme vous pouvez le voir, le retour est également une structure JSON.
"Handshake"
Si vous regardez précisément ce qui se passe lors de l'initialisation de la connexion du client Comet, vous verrez qu'en fait, il n'y a pas de connexion véritable, mais un handshake. Cela consiste en fait à se présenter au bus Comet et à obtenir de sa part, un identifiant. Nous devrons, par la suite toujours, utiliser cet identifiant pour communiquer avec le bus.
Le message du handshake doit contenir au moins les champs suivants :
- channel : le channel.
- version : la version du protocole.
- supportedConnectionTypes : les types de connexions supportées.
Un channel est représenté comme un chemin sous la forme /foo/bar/... et doit systématiquement commencer par un /. Il en existe deux types : ceux définis par l'application et ceux réservés par le protocole. Ces derniers commencent toujours par /meta/. Dans le cas du handshake, le channel doit être /meta/handshake.
La version doit correspondre au numéro de version du protocole. 1.0 ici.
Le paramètre supportedConnectionTypes contient la liste des types de transports supportés. Bien que nous ayons dit plus haut que nous emploierons du long-polling, nous positionnerons la valeur ["long-polling", "callback-polling"]. Ceci simplement parce que si nous souhaitons implémenter le callback-polling nous n'aurons pas de modification à faire ;)
Voici donc à quoi ressemblera la méthode de handshake :
module Faye
class Client
def initialize( uri_or_string )
@uri = uri_or_string
@uri = URI.parse(@uri) if @uri.class == String
@clientId = nil
@interval = nil
# ...
end
# ...
def handshake
id = Faye.random(32)
message = {
"channel" => Faye::Channel::HANDSHAKE,
"version" => Faye::BAYEUX_VERSION,
"supportedConnectionTypes" => [ "long-polling", "callback-polling" ],
"id" => id
}
response = send( message )[0]
if response["successful"] and response["id"] == id
@clientId = response["clientId"]
@interval = response["advice"]["interval"]
else
raise
end
end
# ...
end
end
Dans cette méthode j'utilise Faye::Channel::HANDSHAKE et Faye::BAYEUX_VERSION, deux constantes déclarées dans faye. Si vous souhaitez être indépendant de ce dernier, vous pouvez remplacer ces valeurs par celle que j'ai indiquée. Vous noterez également que j'ai ajouté dans le message le paramètre optionnel id. Ce paramètre permettra de valider, dans certains cas, que la réponse que nous recevons est bien celle attendu. En effet, si en retour nous retrouvons ce même ID c'est que, à priori tout va bien. Je dis bien "à priori"...
Je vous ai indiqué que la raison d'être de ce
- successful : suffisament parlant je pense ;)
- id : qui doit correspondre à l'ID envoyé.
- clientId : le clientId a utiliser par la suite.
Je ne parlerai pas de la valeur interval que je récupère simplement en pensant à une prochaine évolution de ce petit développement...
Connexion
La connexion se fait en envoyant un message contenant les données suivantes :
- channel : dans le cas d'une connexion nous utiliserons /meta/connect qui sera ici récupéré via la constante Faye::Channel::CONNECT.
- clientId : est la valeur récupérée lors du handshake
- connectionType : je n'y reviens pas, mais vous l'avez compris, nous faisons du long-polling.
- id : là encore, ce n'est que mimétisme.
Voici donc comment nous pouvons implémenter cela :
module Faye
class Client
def initialize( uri_or_string )
@uri = uri_or_string
@uri = URI.parse(@uri) if @uri.class == String
@clientId = nil
@interval = nil
# ...
end
# ...
def connect
id = Faye.random(32)
message = {
"channel" => Faye::Channel::CONNECT,
"clientId" => @clientId,
"connectionType" => "long-polling",
"id" => id
}
r = send( message )
# ...
end
end
end
A ce niveau, si vous faites un petit test, vous vous rendrez compte que la connexion attend une réponse de la part du serveur. En effet cette connexion attend que le bus Comet envoie des données. C'est ainsi que nous simulons le push ! Ce sont ces données que nous devrons donc traiter en fonction des souscriptions.
Pour savoir si le retour est exact, nous devons retrouver dans la structure JSON le champ successful à true et le champ id avec la même valeur que celle envoyée.
Si tout se passe bien, alors nous pouvons récupérer les données du message dans la structure data :
module Faye
class Client
def initialize( uri_or_string )
@uri = uri_or_string
@uri = URI.parse(@uri) if @uri.class == String
@clientId = nil
@interval = nil
# ...
end
# ...
def connect
id = Faye.random(32)
message = {
"channel" => Faye::Channel::CONNECT,
"clientId" => @clientId,
"connectionType" => "long-polling",
"id" => id
}
r = send( message )
if r[0]["id"] == id and r[0]["successful"] == true
# traitement du message contenu dans r[1]["data"] pour la souscription au channel r[1]["channel"]
# ...
elsif r[0]["successful"] == false
# ...
end
# ...
end
# ...
end
end
Nous verrons au paragraphe souscription comment traiter le message de réponse. Avant cela, détaillons ce que nous devons faire en cas d'échec. Le protocole Bayeux nous indique que dans un tel cas, il faut refaire un handshake puis refaire la connexion. Cependant, comme la connexion elle-même est bloquante, dans le sens où elle attend une réponse, il serait bon de l'isoler dans un thread afin de permettre le déroulement de notre programme. De plus, si la connexion est un succès, une fois le message traité, il faut en rouvrir une de façon à se remettre en attente d'un nouveau push du serveur. Pour cela nous pouvons placer la connexion dans une boucle infinie.
Voici donc comment nous allons gérer cela :
module Faye
class Client
def initialize( uri_or_string )
@uri = uri_or_string
@uri = URI.parse(@uri) if @uri.class == String
@clientId = nil
@interval = nil
@connection = nil
# ...
end
# ...
def connect
@connection.kill unless @connection.nil?
@connection = Thread.new {
faild = false
while true
id = Faye.random(32)
message = {
"channel" => Faye::Channel::CONNECT,
"clientId" => @clientId,
"connectionType" => "long-polling",
"id" => id
}
r = send( message )
if r[0]["id"] == id and r[0]["successful"] == true
# traitement du message contenu dans r[1]["data"] pour la souscription au channel r[1]["channel"]
# ...
elsif r[0]["successful"] == false
faild = true
break
end
end
if faild
handshake()
connect()
end
}
end
# ...
end
end
Souscription
La souscription se fait pour un ou plusieurs channels avec, en second paramètre, la méthode qui doit être utilisée pour traiter les massages venant de ces channels. Nous avons déjà parlé des channels. Bien entendu dans le cas présent il s'agira de channels spécifiques à l'application. Concernant le callback, nous utiliserons un block.
Le message envoyé au bus Comet doit contenir les informations suivantes :
- channel : il ne s'agit pas, ici, du channel auquel nous souhaitons souscrire, mais celui utilisé par le protocole pour déclarer une souscription : /meta/subscribe, ou, dans notre cas ou nous utilisons faye : Faye::Channel::SUBSCRIBE.
- clientId : le client ID récupéré lors du handshake.
- subscription : ce paramètre prend en valeur un tableau des channels auxquels nous souhaitons souscrire.
- id : l'id du message.
Comme vous pouvez le voir, nulle part nous ne stockons le callback à utiliser pour traiter les messages renvoyés. C'est normal puisque le bus Comet n'en à que faire puisqu'il seront exécutés côté client. Il nous appartient donc de les stocker. Nous ferons cela en mettant en place un hashage ayant pour clé le channel et comme valeur le block.
module Faye
class Client
def initialize( uri_or_string )
@uri = uri_or_string
@uri = URI.parse(@uri) if @uri.class == String
@clientId = nil
@interval = nil
@connection = nil
@subscriptions = {}
end
# ...
def subscribe( channels, &block )
channels = [channels] unless channels.class == Array
if block
channels.each do |c|
@subscriptions[c] = block
end
end
message = {
"channel" => Faye::Channel::SUBSCRIBE,
"clientId" => @clientId,
"subscription" => channels,
"id" => Faye.random(32)
}
r = send(message)
end
# ...
end
end
Je ne prends pas la peine de gérer le retour. Ce n'est pas bien !
Connexion
Nous pouvons maintenant revenir sur la connexion afin de traiter les messages. Il suffit donc de passer le message au block correspond au channel qui l'a poussé :
# ...
def connect
@connection.kill unless @connection.nil?
@connection = Thread.new {
faild = false
while true
id = Faye.random(32)
message = {
"channel" => Faye::Channel::CONNECT,
"clientId" => @clientId,
"connectionType" => "long-polling",
"id" => id
}
r = send( message )
if r[0]["id"] == id and r[0]["successful"] == true
@subscriptions[r[1]["channel"]].call( r[1]["data"])
elsif r[0]["successful"] == false
faild = true
break
end
end
if faild
handshake()
connect()
end
}
end
# ...
Publication
La publication se fait par l'envoi d'un message contenant les champs suivants :
- channel : le channel destinataire du message.
- data : les données, sous forme d'une structure JSON.
- clientId : le fameux client ID.
- id : par habitude ;)
Ceci nous donne donc le code suivant :
module Faye
class Client
# ...
def publish( channel, data )
message = [
{
"channel" => channel,
"data" => data,
"clientId" => @clientId,
"id" => Faye.random(32)
}
]
r = send(message)[0]
end
# ...
end
end
Encore une fois, je ne prends pas le temps de traiter la réponse... Encore une fois, ce n'est pas bien !!!
... et leur contraire
Là où il y a connexion, il y a forcement "déconnexion", de même la où il y a souscription, il y a désabonnement. Je pense que vous avez compris le principe, et je me permets donc de vous livrer sans autre explication les méthodes correspondantes :
module Faye
class Client
# ...
def disconnect
unless @connection.nil?
@connection.kill
message = {
"channel" => Faye::Channel::DISCONNECT,
"clientId" => @clientId,
"id" => Faye.random(32)
}
r = send( message )
end
end
# ...
def unsubscribe( channels )
channels = [channels] unless channels.class == Array
channels.each do |c|
@subscriptions.delete(c)
end
message = {
"channel" => Faye::Channel::UNSUBSCRIBE,
"clientId" => @clientId,
"subscription" => channels,
"id" => Faye.random(32)
}
r = send(message)
end
# ...
end
end
Let's play !
Maintenant que nous avons notre librairie cliente, nous pouvons développer un petit client en ligne de commande pour soapbox.
x = Faye::Client.new( 'http://localhost:3000/comet' )
puts "-- handshake"
x.handshake
puts "-- subscriptions"
x.subscribe( "/mentioning/daemon" )
x.subscribe( "/from/greg" ) { |r|
puts "#{r["user"]} : #{r["message"]}"
}
puts "-- connect"
x.connect
msg = ""
while msg != "quit"
msg = $stdin.readline.chomp
unless msg == "quit"
channel = "/from/daemon"
data = { "user" => "daemon", "message" => msg }
r = x.publish( channel, data )
unless r["successful"]
puts "=> Message not send !"
end
end
end
x.disconnect
Bon, OK, c'est très très rudimentaire, mais vous avez compris ! Et le principal c'est que cela fonctionne :

A+
Attention, n'oubliez pas la remarque faite sur la page du projet faye : it's a toy. Donc, tout comme ce qui précède, n'utilisez pas cela en production4. Et bien cela s'applique, à plus forte raison, à ce qui est écrit ci-dessus. Maintenant, si vous êtes joueur, vous pouvez récupérer cela dans les sources de Capcode ou ici.Notez que si je me suis attaché à faye, ce petit client devrait pouvoir fonctionner avec toute solution Comet respectant le protocole Bayeux.
1 Merci !
2 :P
3 Ca peut paraitre idiot, cela n'en reste pas moins vrai !
4 On vous aura prévenu !