Créer un système de messagerie instantanée avec Ruby, AMQP et Ncurses
Dans ce post, nous allons nous amuser à mettre en place un système de messagerie instantanée en utilisant AMQP. AMQP est un protocole permettant de gérer des échanges de messages entre applications. Il s'agit d'un protocole ouvert dont il existe de multiples implémentations. Dans cet exemple, nous utiliserons Ruby pour créer notre système de chat.
Afin de rendre notre client de messagerie un peu plus convivial, nous nous amuserons à mettre en place une interface utilisateur en mode texte avec Ncurses. Aucun rapport avec AMQP bien entendu, il s'agit simplement de s'amuser un peu ;)
Autant vous prévenir tout de suite, je ne vais faire qu'effleurer les deux sujets (AMQP et Ncurses). Mon objectif étant de vous faire toucher du doigt ces différentes technologies, en espérant attirer votre curiosité, et en vous laissant le plaisir d'approfondir ces sujets par vous même.
AMQP
Avant de se lancer dans le développement, il faut comprendre les mécanismes d'AMQP. Pour cela, regardons le schéma1 suivant :

Comme vous pouvez le voir, le fonctionnement est très simple. Nous avons des applications qui envoient des messages à des exchanges sur le serveur. Les exchanges se chargent de router les messages vers des queues. Les clients s'étant abonnés à ces queues recevront alors les messages. Bien entendu, les clients produisant et consommant les messages peuvent être les mêmes.
Jusque-là, rien de bien compliqué. Ce qu'il faut bien comprendre, ce sont les différents types d'exchanges et le fonctionnement des queues. Avant de voir cela, voyons sommairement comment mettre en place un client.
La première étape consiste à se connecter au broker. Une fois la connexion établie, nous créons un channel à partir duquel nous mettons en place un exchange et une queue. Pour envoyer un message, il suffit de le publier auprès de l'exchange. A l'autre bout, pour recevoir les messages, nous allons souscrire à la queue. Voici un exemple :
1 require "rubygems"
2 require "amqp"
3
4 AMQP.start("amqp://localhost") do |connection|
5 channel = AMQP::Channel.new(connection)
6 queue = channel.queue("example.hello", :auto_delete => true)
7 exchange = channel.direct("")
8
9 queue.subscribe do |headers, payload|
10 puts "Message reçu: #{payload}"
11
12 connection.close {
13 EM.stop { exit }
14 }
15 end
16
17 exchange.publish "Hello, world!", :routing_key => queue.name
18 end
Nous utilisons la méthode AMQP.start (ligne 4) pour initialiser la connexion. Cette méthode prend en paramètre l'URL de connexion au broker. Nous créons ensuite le channel, la queue et l'exchange (lignes 5 à 7). Pour la queue, vous noterez l'utilisation du paramètre :auto_delete permettant d'indiquer que l'exchange peut être supprimé quand il n'y a plus de queue qui l'utilise. A la ligne 9, nous nous inscrivons auprès de la queue afin de récupérer les messages. La méthode subscribe prend en paramètre un bloc recevant les informations d'entête des messages (headers) ainsi que le corps de ces messages (payload). Dans le bloc, nous affichons simplement le contenu du message et nous fermons la connexion avec le broker. Vous remarquerez que j'utilise un EM.stop lors de la fermeture de la connexion. En effet, la gem amqp utilise EventMachine. Nous aurions d'ailleurs pu utiliser la syntaxe suivante pour notre exemple :
EventMachine.run do
connection = AMQP.connect("amqp://localhost")
# ...
end
Pour envoyer un message, nous utilisons la méthode publish (ligne 17). Nous utilisons l'option :routing_key afin d'indiquer à l'exchange comment il doit router le message (vers quelle queue).
Les exchanges et les queues sont gérés par le serveur (aussi appelé broker). Nous n'allons pas écrire ce serveur. Le but de ce post n'étant pas d'implémenter le protocole AMQP2 mais de l'utiliser. Nous allons donc utiliser une implémentation existante. Pour cela vous avez le choix entre3 OpenAMQ, StormMQ, RabbitMQ ou Qpid. Pour la partie client, j'utilise Ruby avec la gem amqp.
Les exchanges
Comme je l'ai indiqué, les exchanges reçoivent et routent les messages. Il existe quatre types d'exchanges :
direct
Cet exchange est utilisé pour de l'échange 1:1. Il utilise une clé de routage pour délivrer les messages à une queue. Pour voir cela, écrivons un client :
# client.rb
require "rubygems"
require "amqp"
who = ARGV[0]
AMQP.start("amqp://localhost") do |connection|
channel = AMQP::Channel.new(connection)
queue = channel.queue("hello.#{who}", :auto_delete => true)
queue.subscribe do |headers, payload|
puts "Message reçu: #{payload}"
connection.close {
EM.stop { exit }
}
end
end
Et un serveur :
# server.rb
require "rubygems"
require "amqp"
who = ARGV[0]
puts "Say hello to #{who} via hello.#{who}"
AMQP.start("amqp://localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.direct("")
exchange.publish "Bonjour #{who}!", :routing_key => "hello.#{who}"
Thread.new do
sleep 1
connection.close {
EM.stop { exit }
}
end
end
Nous utilisons l'option :routing_key, lors de l'envoi du message (method publish), pour préciser à quelle queue doit être envoyé le message. Si vous démarrez plusieurs clients, avec, en paramètre des noms différents, et que vous exécutez plusieurs fois le serveur avec ces mêmes noms, vous verrez que chaque client recevra le bon message.

fanout
Cet exchange envoie les messages vers toutes les queues qui lui sont rattachées, en ignorant les clés de routage. Pour vérifier cela, modifier le client de la façon suivante :
# client.rb
require "rubygems"
require "amqp"
who = ARGV[0]
AMQP.start("amqp://localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.fanout("public.message")
channel.queue("hello.#{who}", :auto_delete => true).bind(exchange).subscribe do |headers, payload|
puts "Message reçu: #{payload}"
connection.close {
EM.stop { exit }
}
end
end
Dans ce code, nous déclarons un exchange avec le nom public.message, et nous utilisons la méthode bind pour lier l'exchange à la queue. Modifiez ensuite le serveur :
# server.rb
require "rubygems"
require "amqp"
who = ARGV[0]
puts "Say hello to #{who} via hello.#{who}"
AMQP.start("amqp://localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.fanout("public.message")
exchange.publish "Bonjour #{who}!", :routing_key => "hello.#{who}"
Thread.new do
sleep 1
connection.close {
EM.stop { exit }
}
end
end
Ici, nous faisons attention d'utiliser l'exchange de même nom. Si vous testez en lançant plusieurs clients, avec des noms différents, vous verrez que dans tous les cas, quand nous envoyons un message, il sera reçu systématiquement par tous les clients.

topic
Les exchanges de type topic permettent de router des messages vers une ou plusieurs queues en utilisant des clés de routages qui peuvent être décrites sous forme de pattern.
Les patterns utilisés pour décrire les clés de routages peuvent utiliser 2 caractères spéciaux :
- # qui match zéro ou plusieurs mots.
- * qui match un mot exactement.
Voici un exemple d'utilisation :
# topic.rb
require "rubygems"
require "amqp"
AMQP.start("amqp://localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("os.info", :auto_delete => true)
channel.queue("comp.os.unix").bind(exchange, :routing_key => "comp.os.unix.#").subscribe do |headers, payload|
puts "Reçu par comp.os.unix: #{payload}"
end
channel.queue("comp.os.unix.linux").bind(exchange, :routing_key => "comp.os.unix.linux").subscribe do |headers, payload|
puts "Reçu par comp.os.unix.linux: #{payload}"
end
channel.queue("comp.os.unix.bsd").bind(exchange, :routing_key => "comp.os.unix.bsd").subscribe do |headers, payload|
puts "Reçu par comp.os.unix.bsd: #{payload}"
end
channel.queue("comp.os.mac").bind(exchange, :routing_key => "comp.os.mac").subscribe do |headers, payload|
puts "Reçu par comp.os.mac: #{payload}"
end
channel.queue("comp.os").bind(exchange, :routing_key => "comp.os.*").subscribe do |headers, payload|
puts "Reçu par comp.os: #{payload}"
end
EventMachine.add_timer(1) do
exchange.publish("Hello Linux!", :routing_key => "comp.os.unix.linux")
exchange.publish("Hello BSD!", :routing_key => "comp.os.unix.bsd")
exchange.publish("Hello Mac!", :routing_key => "comp.os.mac")
exchange.publish("Hello Unix!", :routing_key => "comp.os.unix")
end
EM.add_timer(3, Proc.new { connection.close { EventMachine.stop } })
end

headers
Les exchanges de types headers utilisent un système de routage basé sur les données d'entête des messages pour dispatcher ces derniers vers les queues.
En effet, il est possible d'attacher aux messages envoyés, des informations personnalisées. Voyons le code suivant :
# client.rb
require "rubygems"
require "amqp"
who = ARGV[0]
AMQP.start("amqp://localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.fanout("headers.example")
channel.queue("headers.queue", :auto_delete => true).bind(exchange).subscribe do |headers, payload|
puts "Message reçu: #{payload}"
p headers.attributes[:headers]
connection.close {
EM.stop { exit }
}
end
exchange.publish "Bonjour #{who}!", :headers => { :toto => "hello", :titi => 42 }
Thread.new do
sleep 1
connection.close {
EM.stop { exit }
}
end
end
Si vous exécutez cet exemple, vous verrez que nous récupérons, via la clé :headers de l'attribut attributes des headers ce qui a été passé via l'option :headers lors de la publication du message.
C'est sur ce type de données que vont se baser les exchanges de type headers. Pour cela, nous utiliserons l'option :arguments lors du rattachement de l'exchange à une queue :
queue.bind(exchange, :arguments => { :toto => hello, :titi => 42 })
Le hash passé comme valeur de l'option :arguments peut également contenir un tuple avec la clé x-match indiquant comment doit se faire le matching. Dans ce tuple, la valeur possible peut-être :
- all dans ce cas, il faudra que tous les tuples clé/valeur envoyés via l'option :headers match avec les tuples de l'option :arguments du binding.
- any dans ce cas, il faudra qu'au moins un des tuples clé/valeur envoyés via l'option :headers match avec un des tuples de l'option :arguments du binding.
require "rubygems"
require "amqp"
AMQP.start("amqp://localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.headers("os.info", :durable => true)
channel.queue("comp.os.unix").bind(exchange, :arguments => { :os => "unix" }).subscribe do |metadata, payload|
puts "Reçu par comp.os.unix: #{payload}"
end
channel.queue("comp.os.unix.linux").bind(exchange, :arguments => { :os => "unix", :name => "linux" }).subscribe do |metadata, payload|
puts "Reçu par comp.os.unix.linux: #{payload}"
end
channel.queue("comp.os.unix.bsd").bind(exchange, :arguments => { :os => "unix", :name => "bsd" }).subscribe do |metadata, payload|
puts "Reçu par comp.os.unix.bsd: #{payload}"
end
channel.queue("comp.os.mac").bind(exchange, :arguments => { 'x-match' => 'any', :os => "mac", :version => "10.7" }).subscribe do |metadata, payload|
puts "Reçu par comp.os.mac: #{payload}"
end
channel.queue("comp.os").bind(exchange).subscribe do |metadata, payload|
puts "Reçu par comp.os: #{payload}"
end
EventMachine.add_timer(1) do
exchange.publish("Hello Linux!", :headers => { :os => "unix", :name => "linux" })
exchange.publish("Hello BSD!", :headers => { :os => "unix", :name => "bsd" })
exchange.publish("Hello Mac!", :headers => { :os => "mac" })
exchange.publish("Hello Unix!", :headers => { :os => "unix" })
end
EM.add_timer(3, Proc.new { connection.close { EventMachine.stop } })
end

Les queues
Avant de continuer, et avancer sur l'objectif de ce post, qui est de faire un système de messagerie instantanée, je voudrais juste faire un aparté sur les queues. Vous avez certainement remarqué que j'ai souvent utilisé l'option :auto_delete lors de la création de queues. Cette option permet de demander à ça que la queue soit détruite quand elle n'est plus utilisée par aucun client. Par défaut ce n'est pas le cas, et c'est plutôt une bonne chose. Il est ainsi possible d'envoyer des messages dans une ou plusieurs queues, qui seront consommés par des clients qui ne sont pas connectés.
Pour illustrer cela, voici le code deux deux clients et un serveur :
# client1.rb
require "rubygems"
require "amqp"
who = ARGV[0]
AMQP.start("amqp://localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.fanout("public.message")
channel.queue("hello.client1").bind(exchange).subscribe do |headers, payload|
puts "Message reçu: #{payload}"
connection.close {
EM.stop { exit }
}
end
end
Ce premier client utilise la queue hello.client1.
# client2.rb
require "rubygems"
require "amqp"
who = ARGV[0]
AMQP.start("amqp://localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.fanout("public.message")
channel.queue("hello.client2").bind(exchange).subscribe do |headers, payload|
puts "Message reçu: #{payload}"
connection.close {
EM.stop { exit }
}
end
end
Ce second client utilise la queue hello.client2.
require "rubygems"
require "amqp"
who = ARGV[0]
AMQP.start("amqp://localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.fanout("public.message")
channel.queue("hello.client1").bind(exchange)
channel.queue("hello.client2").bind(exchange)
exchange.publish "[#{Time.now}] Bonjour clients!"
EM.add_timer(0.5, Proc.new { connection.close { EventMachine.stop } })
end
Le serveur va également créer les queues hello.client1 et hello.client2 avant de publier son message. Ainsi, les queues n'étant jamais détruites, les clients pourront récupérer le message envoyé par le serveur, y compris s'ils sont démarrés après.

Dans cet exemple, j'utilise RabbitMQ comme broker et la commande rabbitmqctl pour lister l'état des queues.
Première version
Maintenant que nous savons comment utiliser AMQP, nous pouvons écrire une première version de notre système de chat :
1 #!/urs/bin/env ruby
2
3 require "rubygems"
4 require "amqp"
5
6 nickname = ARGV[0]
7 raise "Usage : #{$0} <nickname>" if nickname.nil?
8
9 AMQP.start("amqp://localhost") do |connection|
10 channel = AMQP::Channel.new(connection)
11 exchange = channel.fanout("amqp.chat.v1")
12
13 queue = channel.queue(nickname, :auto_delete => true)
14
15 queue.bind(exchange).subscribe do |headers, payload|
16 puts "#{payload}"
17 end
18
19 exchange.publish "** #{nickname} enter"
20
21 Thread.new do
22 while true do
23 message = STDIN.gets.chomp
24 case message
25 when /^\/quit\s*(.*)/
26 exchange.publish "** #{nickname} has quit (#{$1})"
27 connection.close {
28 EventMachine.stop
29 }
30 else
31 exchange.publish "[#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{message}"
32 end
33 end
34 end
35 end
Si vous avez bien lu la première partie de ce post, vous ne devriez pas être trop perdu avec le code ci-dessus. Je me suis contenté de mettre en place un exchange de type fanout. Chaque client se connectant créé une queue ayant comme nom le nickname choisit par l'utilisateur. Je fais ensuite une boucle infinie en capturant la saisie au clavier. Si cette saisie commence par /quit alors nous terminons la session.
Voici une capture de cette première version :

Seconde version : ajoutons des fonctionnalités
Afin de rendre notre système de messagerie plus intéressant, nous pouvons essayer de lui ajouter des fonctionnalités. Je vous propose les suivantes :
- Envoi de messages privés entre utilisateurs. Pour cela nous utiliserons la syntaxe /privmsg <nickname> <message>
- Envoi de message d'humeur, en utilisant la syntaxe /me <message>
- Demande de la liste des utilisateurs connectés, en utilisant la syntaxe /who
- Affichage de l'aide avec la commande /help
Comme vous pouvez l'imaginer, il va falloir changer de méthode pour gérer différents types de messages. En effet, nous avons besoin de plusieurs queues : une pour les messages publics, une pour les messages privés et une pour les messages systèmes. Je vous propose d'utiliser un exchange de type headers.
exchange = channel.headers("amqp.chat.v2")
Associé à cet exchange, chaque utilisateur créera trois queues, une pour chaque type de message.
La première queue servira à recevoir les messages publics. Elle aura donc un nom du type public.<nickname> et sera rattaché à l'exchange avec comme arguments un simple tuple :type => 'public' :
public_queue = channel.queue("public.#{nickname}", :auto_delete => true)
public_queue.bind(exchange, :arguments => { :type => 'public' })
Pour délivrer un message privé, il nous suffira donc de mettre dans les headers le tuple :type => 'public'.
exchange.publish message, :headers => {:type => 'public'}
Le second type de message est celui de type privé. Ces messages ne sont donc visibles que pour l'utilisateur concerné. Dans ce cas, nous allons créer une queue avec comme arguments un tuple :type => 'private' et un autre avec la clé :nick et comme valeur le nickname de l'utilisateur.
private_queue = channel.queue("private.#{nickname}", :auto_delete => true)
private_queue.bind(exchange, :arguments => { :type => 'private', :nick => nickname })
Pour envoyer un message privé à un utilisateur, il suffira de préciser, dans l'entête du message, que l'exchange doit router le message sur une queue de type private pour le nick de l'utilisateur à qui nous voulons envoyer notre message.
exchange.publish message, :headers => {:type => 'private', :nick => nickname}
Pour les messages systèmes, nous utiliserons simplement une queue avec comme type la valeur system :
system_queue = channel.queue("system.#{nickname}", :auto_delete => true)
system_queue.bind(exchange, :arguments => { :type => 'system' })
Nous utiliserons cette queue pour la commande /who. En effet, pour répondre à cette demande, nous enverrons un message de type system à chaque utilisateur connecté, avec dans ce message, uniquement le nickname de l'utilisateur qui fait la demande. Chaque utilisateur recevant ce message, renverra automatiquement une réponse privée (du genre "<nickname> is here") à l'utilisateur ayant fait la demande.
system_queue.bind(exchange, :arguments => { :type => 'system' }).subscribe do |headers, payload|
exchange.publish "-- #{nickname} is here", :headers => {:type => 'private', :nick => payload.to_s}
end
Voici le code de notre seconde version :
1 #!/usr/bin/env ruby
2
3 require "rubygems"
4 require "amqp"
5
6 def help
7 puts <<EOH
8 /help : display this help
9 /me <message> : send an action message
10 /privmsg <nickname> <message> : send a private message
11 /quit : Quit AMQP chat
12 /who : ask who is here
13 EOH
14 end
15
16 nickname = ARGV[0]
17 raise "Usage : #{$0} <nickname>" if nickname.nil?
18
19 AMQP.start("amqp://localhost") do |connection|
20 channel = AMQP::Channel.new(connection)
21 exchange = channel.headers("amqp.chat.v2")
22
23 public_queue = channel.queue("public.#{nickname}", :auto_delete => true)
24 public_queue.bind(exchange, :arguments => { :type => 'public' }).subscribe do |headers, payload|
25 puts payload.to_s
26 end
27 private_queue = channel.queue("private.#{nickname}", :auto_delete => true)
28 private_queue.bind(exchange, :arguments => { :type => 'private', :nick => nickname }).subscribe do |headers, payload|
29 puts payload.to_s
30 end
31 system_queue = channel.queue("system.#{nickname}", :auto_delete => true)
32 system_queue.bind(exchange, :arguments => { :type => 'system' }).subscribe do |headers, payload|
33 exchange.publish "-- #{nickname} is here", :headers => {:type => 'private', :nick => payload.to_s}
34 end
35
36 exchange.publish "** #{nickname} enter", :headers => {:type => 'public'}
37
38 Thread.new do
39 while true
40 message = STDIN.gets.chomp
41 case message
42 when /^\/quit\s*(.*)/
43 exchange.publish "** #{nickname} has quit (#{$1})", :headers => {:type => 'public'}
44 connection.close {
45 EventMachine.stop
46 }
47 break
48 when /^\/me\s*(.*)/
49 exchange.publish "** #{nickname} #{$1}", :headers => {:type => "public"}
50 when /^\/privmsg\s*([^\s]*)\s*(.*)/
51 exchange.publish ">> [#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{$2}", :headers => {:type => 'private', :nick => $1}
52 exchange.publish ">> [#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{$2}", :headers => {:type => 'private', :nick => nickname}
53 when /^\/who/
54 exchange.publish nickname, :headers => {:type => "system"}
55 when /^\/help/
56 help
57 else
58 exchange.publish "[#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{message}", :headers => {:type => 'public'}
59 end
60 end
61 end
62 end
Et voici une capture :

Troisième version : une interface en Ncurses
Nous avons une version fonctionnelle, mais pas vraiment conviviale. En effet, si nous recevons un message, alors que nous sommes en train d'en rédiger un, c'est un peu le foutoir.
Nous allons améliorer cela en mettant en place une interface utilisateur grâce à Ncurses, et plus précisément, dans notre cas, à ncurses-ruby.
1 #!/usr/bin/env ruby
2
3 require "rubygems"
4 require "ncurses"
5 require "amqp"
6
7 class ChatGui
8 def read_line(y, x,
9 window = Ncurses.stdscr,
10 max_len = (window.getmaxx - x - 1),
11 string = "",
12 cursor_pos = 0)
13 loop do
14 window.mvaddstr(y,x,string)
15 window.move(y,x+cursor_pos)
16 ch = window.getch
17 case ch
18 when Ncurses::KEY_ENTER, ?\n.ord, ?\r.ord
19 return string
20 when Ncurses::KEY_BACKSPACE, 127
21 string = string[0...([0, cursor_pos-1].max)] + string[cursor_pos..-1]
22 cursor_pos = [0, cursor_pos-1].max
23 window.mvaddstr(y, x+string.length, " ")
24 when (" "[0].ord..255)
25 if (cursor_pos < max_len)
26 string[cursor_pos,0] = ch.chr
27 cursor_pos += 1
28 else
29 Ncurses.beep
30 end
31 else
32 Ncurses.beep
33 end
34 end
35 end
36
37 def add_message(message)
38 @messages += message.split("\n")
39 if @messages.size > @max_messages
40 @messages.shift
41 end
42
43 refresh_messages_window
44 end
45
46 def refresh_messages_window
47 @messages_window.clear
48 y = 0
49 @messages.each do |message|
50 @messages_window.mvaddstr(y, 0, message)
51 y = y + 1
52 end
53 @messages_window.refresh
54 end
55
56 def initialize(nick)
57 @messages = []
58 Ncurses.initscr
59 Ncurses.cbreak
60 Ncurses.noecho
61 Ncurses.keypad(Ncurses.stdscr, true)
62
63 @window = Ncurses.stdscr
64 @maxy = @window.getmaxy - 1
65 @maxx = @window.getmaxx - 1
66
67 @prompt_window = Ncurses.newwin(2, @maxx, @maxy - 2, 0)
68 @prompt = "#{nick} >"
69
70 @messages_window = Ncurses.newwin(@maxy - 2, @maxx, 0, 0)
71 @max_messages = @messages_window.getmaxy
72 end
73
74 def run(&b)
75 loop do
76 # refresh_messages_window
77
78 @prompt_window.mvaddstr(0, 0, "-"*@maxx)
79 @prompt_window.mvaddstr(1, 0, @prompt)
80 message = read_line(1, @prompt.length + 1, @prompt_window)
81 yield message
82 @prompt_window.clear
83 end
84 end
85
86 def quit
87 Ncurses.endwin
88 end
89 end
90
91 # -- main --
92
93 def help(gui)
94 gui.add_message <<EOH
95 /help : display this help
96 /me <message> : send an action message
97 /privmsg <nickname> <message> : send a private message
98 /quit : Quit AMQP chat
99 /who : ask who is here
100 EOH
101 end
102
103 nickname = ARGV[0]
104 raise "Usage : #{$0} <nickname>" if nickname.nil?
105
106 gui = ChatGui.new(nickname)
107
108 AMQP.start("amqp://localhost") do |connection|
109 channel = AMQP::Channel.new(connection)
110 exchange = channel.headers("amqp.chat.v3")
111
112 public_queue = channel.queue("public.#{nickname}", :auto_delete => true)
113 public_queue.bind(exchange, :arguments => { :type => 'public' }).subscribe do |headers, payload|
114 gui.add_message payload.to_s
115 end
116 private_queue = channel.queue("private.#{nickname}", :auto_delete => true)
117 private_queue.bind(exchange, :arguments => { :type => 'private', :nick => nickname }).subscribe do |headers, payload|
118 gui.add_message payload.to_s
119 end
120 system_queue = channel.queue("system.#{nickname}", :auto_delete => true)
121 system_queue.bind(exchange, :arguments => { :type => 'system' }).subscribe do |headers, payload|
122 exchange.publish "-- #{nickname} is here", :headers => {:type => 'private', :nick => payload.to_s}
123 end
124
125 exchange.publish "** #{nickname} enter", :headers => {:type => 'public'}
126
127 Thread.new do
128 gui.run do |message|
129 case message
130 when /^\/quit\s*(.*)/
131 exchange.publish "** #{nickname} has quit (#{$1})", :headers => {:type => 'public'}
132 connection.close {
133 EventMachine.stop
134 }
135 break
136 when /^\/me\s*(.*)/
137 exchange.publish "** #{nickname} #{$1}", :headers => {:type => "public"}
138 when /^\/privmsg\s*([^\s]*)\s*(.*)/
139 exchange.publish ">> [#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{$2}", :headers => {:type => 'private', :nick => $1}
140 exchange.publish ">> [#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{$2}", :headers => {:type => 'private', :nick => nickname}
141 when /^\/who/
142 exchange.publish nickname, :headers => {:type => "system"}
143 when /^\/help/
144 help gui
145 else
146 exchange.publish "[#{Time.now.strftime('%H:%M:%S')}] #{nickname} : #{message}", :headers => {:type => 'public'}
147 end
148 end
149 end
150 end
151
152 gui.quit
La classe ChatGui permet de gérer la partie interface avec Ncurses. Pour cela, nous mettons en place deux fenêtres, @prompt_window et @messages_window servant respectivement à la saisie des messages et à l'affichage de ces derniers. La saisie des messages est gérée par la méthode read_line. L'ajout de nouveaux messages et le rafraichissement de la fenêtre @messages_window sont dévolus à la méthode add_message. La méthode run prend en paramètre un bloc recevant en argument le message venant d'être saisi par l'utilisateur.
Le reste du code n'est rien de plus qu'une adaptation de notre seconde version ;)
Voici ce que donne cette troisième version :

Si vous souhaitez vous amuser, sachez que le code est également disponible ici.
1 Source : http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol
2 Si jamais vous souhaiter poussez le jeu un peu plus loin, voici la spécification d'AMQP 1.0
3 N'hésitez pas à compléter cette liste en commentaire.