Tuesday, January 4, 2011

RabbitMQ, monadically

This is just so cool i have to write a little bit about it even before i clean it up and prettify it. One of the potentials of delimited continuations is to enable a beautifully simple monadic interface to input streams. Here's an image of a session in operation.



As you can see the primary usage pattern looks like this

for( msg <- rabbitMonadically.beginService() ) {
// msg handling code goes here
...
}

To me this is very intuitive. We are iterating over a stream of messages and handling them in the code body of the "for-loop". All the threading and state management that supports this view is safely tucked away behind this interface.

Oleg Kiselyov has been the most eloquent proponent of this idea, and i cannot hope to match the elegance of his code or his prose. However, it's still a great deal of fun to do something for yourself to test your understanding. So, let's implement this design pattern in Scala over RabbitMQ.

It turns out that Tiark Rompf has already done most of the work for us in one of the test suites for delimited continuations! He did it for a stream of HTTP requests; so, his job was actually a little bit harder because he had to deal with TCP/IP socket requests as well. We just crib his code and modify it to suit our purposes. Likewise, i've been cribbing from and comparing with the Lift actor-based API to RabbitMQ. i leave it to you to do the comparison in terms of understandability.

BTW, if you'd like to see more of this -- and in nice book format to boot! -- then please consider making a pledge to the Kickstarter project for the Monadic Design Patterns for the Web book.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223




// -*- mode: Scala;-*-
// Filename: Monadic.scala
// Authors: lgm
// Creation: Wed Dec 29 13:57:38 2010
// Copyright: Not supplied
// Description:
// ------------------------------------------------------------------------

package net.liftweb.amqp

import scala.util.continuations._
//import scala.util.continuations.ControlContext._

import scala.concurrent.{Channel => Chan, _}
import scala.concurrent.cpsops._

import _root_.com.rabbitmq.client._
import _root_.scala.actors.Actor

import _root_.java.io.ObjectInputStream
import _root_.java.io.ByteArrayInputStream
import _root_.java.util.Timer
import _root_.java.util.TimerTask

trait MonadicAMQPDispatcher[T]
extends FJTaskRunners {

type Ticket = Int

trait Generable[+A,-B,+C] {
def funK : (A => (B @suspendable)) => (C @suspendable)

def foreach( f : (A => B @suspendable) ) : C @suspendable = {
funK( f )
}
}

case class Generator[+A,-B,+C](
override val funK : (A => (B @suspendable)) => (C @suspendable)
) extends Generable[A,B,C] {
}

abstract class SerializedConsumer[T](
val channel : Channel
) extends DefaultConsumer( channel ) {
override def handleDelivery(
tag : String,
env : Envelope,
props : AMQP.BasicProperties,
body : Array[Byte]
)
}

def acceptConnections(
params : ConnectionParameters,
host : String,
port : Int
) =
Generator {
k : ( Channel => Unit @suspendable ) => {
//shift {
//innerk : (Unit => Unit @suspendable) => {
val factory = new ConnectionFactory( params )
val connection = factory.newConnection( host, port )
val channel = connection.createChannel()
k( channel );
//}
//}
}
}

def beginService(
params : ConnectionParameters,
host : String,
port : Int
) = Generator {
k : ( T => Unit @suspendable ) =>
//shift {
println( "The rabbit is running... (with apologies to John Updike)" )

for( channel <- acceptConnections( params, host, port ) ) {
spawn {
// Open bracket
println( "Connected: " + channel )
val ticket = channel.accessRequest( "/data" )
channel.exchangeDeclare( ticket, "mult", "direct" )
channel.queueDeclare( ticket, "mult_queue" )
channel.queueBind( ticket, "mult_queue", "mult", "routeroute" )

for ( t <- readT( channel, ticket ) ) { k( t ) }

// println( "Disconnected: " + channel )
// Close bracket
}
}
//}
}

case class AMQPDelivery(
tag : String,
env : Envelope,
props : AMQP.BasicProperties,
body : Array[Byte]
)

def callbacks( channel : Channel, ticket : Ticket) =
Generator {
k : ( AMQPDelivery => Unit @suspendable) =>

println("level 1 callbacks")

shift {
outerk : (Unit => Any) =>

object TheRendezvous
extends SerializedConsumer[T]( channel ) {
override def handleDelivery(
tag : String,
env : Envelope,
props : AMQP.BasicProperties,
body : Array[Byte]
) {
spawn {
println("before continuation in callback")

k( AMQPDelivery( tag, env, props, body ) )

println("after continuation in callback")

outerk()
}
}
}

println("before registering callback")

channel.basicConsume(
ticket,
"mult_queue",
false,
TheRendezvous
)

println("after registering callback")
// stop
}
}

def readT( channel : Channel, ticket : Ticket ) =
Generator {
k: ( T => Unit @suspendable) =>
shift {
outerk: (Unit => Unit) =>
reset {

for (
amqpD <- callbacks( channel, ticket )
) {
val routingKey = amqpD.env.getRoutingKey
val contentType = amqpD.props.contentType
val deliveryTag = amqpD.env.getDeliveryTag
val in =
new ObjectInputStream(
new ByteArrayInputStream( amqpD.body )
)
val t = in.readObject.asInstanceOf[T];
k( t )
channel.basicAck(deliveryTag, false);

// Is this necessary?
shift { k : ( Unit => Unit ) => k() }
}

println("readBytes returning")
outerk()
}
}
}

}

class StdMonadicAMQPDispatcher[T](
val host : String,
val port : Int
) extends MonadicAMQPDispatcher[T](
) {
def acceptConnections()( implicit params : ConnectionParameters )
: Generator[Channel,Unit,Unit] =
acceptConnections( params, host, port )
def beginService()( implicit params : ConnectionParameters )
: Generator[T,Unit,Unit] =
beginService( params, host, port )
}

object StdMonadicAMQPDispatcher {
def apply[T] (
host : String, port : Int
) : StdMonadicAMQPDispatcher[T] = {
new StdMonadicAMQPDispatcher(
host, port
)
}
def unapply[T](
smAMQPD : StdMonadicAMQPDispatcher[T]
) : Option[(String,Int)] = {
Some( ( smAMQPD.host, smAMQPD.port ) )
}
def stdCnxnParams : ConnectionParameters = {
val params = new ConnectionParameters
params.setUsername( "guest" )
params.setPassword( "guest" )
params.setVirtualHost( "/" )
params.setRequestedHeartbeat( 0 )
params
}

implicit val defaultConnectionFactory : ConnectionFactory =
new ConnectionFactory( defaultConnectionParameters )
implicit val defaultConnectionParameters : ConnectionParameters =
stdCnxnParams
implicit val defaultHost : String = "localhost"
implicit val defaultPort : Int = 5672
}




1 comment:

siryc said...

Cool.... Unfortunately continuations is a little hard for me.
But sounds very interesting. Thanks.