diff --git a/Readme.md b/Readme.md index cebee73..aa8a2ee 100644 --- a/Readme.md +++ b/Readme.md @@ -49,12 +49,27 @@ Most of the examples connect to a redis database running in the default port -- } client.Del("l") +### Publish/Subscribe + sub := make(chan string, 1) + sub <- "foo" + messages := make(chan Message, 0) + go client.Subscribe(sub, nil, nil, nil, messages) + + time.Sleep(10 * 1000 * 1000) + client.Publish("foo", []byte("bar")) + + msg := <-messages + println("received from:", msg.Channel, " message:", string(msg.Message)) + + close(sub) + close(messages) + + More examples coming soon. See `redis_test.go` for more usage examples. ## Commands not supported yet * MULTI/EXEC/DISCARD/WATCH/UNWATCH -* SUBSCRIBE/UNSUBSCRIBE/PUBLISH * SORT * ZUNIONSTORE / ZINTERSTORE diff --git a/redis.go b/redis.go index 065866c..bf60a96 100644 --- a/redis.go +++ b/redis.go @@ -24,8 +24,6 @@ type Client struct { Addr string Db int Password string - //the channel for pub/sub commands - Messages chan []byte //the connection pool pool chan *net.TCPConn } @@ -1276,13 +1274,18 @@ func (client *Client) Hgetall(key string, val interface{}) os.Error { //Publish/Subscribe +// Container for messages received from publishers on channels that we're subscribed to. type Message struct { ChannelMatched string Channel string Message []byte } -// Subscribe to channels, will block until the subscribe channel is closed. +// Subscribe to redis serve channels, this method will block until one of the sub/unsub channels are closed. +// There are two pairs of channels subscribe/unsubscribe & psubscribe/punsubscribe. +// The former does an exact match on the channel, the later uses glob patterns on the redis channels. +// Closing either of these channels will unblock this method call. +// Messages that are received are sent down the messages channel. func (client *Client) Subscribe(subscribe <-chan string, unsubscribe <-chan string, psubscribe <-chan string, punsubscribe <-chan string, messages chan<- Message) os.Error { cmds := make(chan []string, 0) data := make(chan interface{}, 0) @@ -1344,6 +1347,7 @@ func (client *Client) Subscribe(subscribe <-chan string, unsubscribe <-chan stri return err } +// Publish a message to a redis server. func (client *Client) Publish(channel string, val []byte) os.Error { _, err := client.sendCommand("PUBLISH", channel, string(val)) if err != nil { diff --git a/redis_test.go b/redis_test.go index c89960d..f197780 100644 --- a/redis_test.go +++ b/redis_test.go @@ -291,6 +291,25 @@ func TestSubscribe(t *testing.T) { close(subscribe) } +func TestSimpleSubscribe(t *testing.T) { + sub := make(chan string, 1) + messages := make(chan Message, 0) + go client.Subscribe(sub, nil, nil, nil, messages) + + sub <- "foo" + time.Sleep(10 * 1000 * 1000) // 10ms + data := "bar" + client.Publish("foo", []byte(data)) + + msg := <-messages + if string(msg.Message) != data { + t.Fatalf("Expected %s but got %s", data, string(msg.Message)) + } + + close(sub) + close(messages) +} + func TestUnsubscribe(t *testing.T) { subscribe := make(chan string, 0) unsubscribe := make(chan string, 0)