Skip to content
This repository was archived by the owner on Aug 19, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ Wray is only a client for Faye. You will need to setup a server using Ruby or No

###Subscribing to channels

```
```go
package main

import "github.com/pythonandchips/wray"
import "fmt"
import "time"

func main() {
//register the types of transport you want available. Only long-polling is currently supported
Expand All @@ -26,24 +27,47 @@ func main() {
client := wray.NewFayeClient("http://localhost:5000/faye")

//subscribe to the channels you want to listen to
client.Subscribe("/foo", false, func(message wray.Message) {
_, err := client.Subscribe("/foo", false, func(message wray.Message) {
fmt.Println("-------------------------------------------")
fmt.Println(message.Data)
})

if err != nil {
fmt.Println("Subscription to /foo failed", err)
}

//wildcards can be used to subscribe to multipule channels
client.Subscribe("/foo/*", false, func(message wray.Message) {
promise, _ = client.Subscribe("/foo/*", false, func(message wray.Message) {
fmt.Println("-------------------------------------------")
fmt.Println(message.Data)
})

if !promise.Successful() {
fmt.Println("Subscription to /foo/* failed", promise.Error())
}

// try to subscribe forever
for {
_, err = client.Subscribe("/foo/*", false, func(message wray.Message) {
fmt.Println("-------------------------------------------")
fmt.Println(message.Data)
})

if err == nil {
break // break out of the loop if there was no error
}

time.Sleep(1*time.Second)
}

//start listening on all subscribed channels and hold the process open
client.Listen()
}
```

###Publishing to channels
```

```go
package main

import "github.com/pythonandchips/wray"
Expand Down
34 changes: 30 additions & 4 deletions go_faye.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package wray

import (
"errors"
"fmt"
"path/filepath"
"strings"
Expand Down Expand Up @@ -43,6 +44,15 @@ type Subscription struct {

type SubscriptionPromise struct {
subscription Subscription
subError error
}

func (self SubscriptionPromise) Error() error {
return self.subError
}

func (self SubscriptionPromise) Successful() bool {
return self.subError == nil
}

func NewFayeClient(url string) *FayeClient {
Expand Down Expand Up @@ -80,16 +90,32 @@ func (self *FayeClient) handshake() {
}
}

func (self *FayeClient) Subscribe(channel string, force bool, callback func(Message)) SubscriptionPromise {
func (self *FayeClient) Subscribe(channel string, force bool, callback func(Message)) (promise SubscriptionPromise, err error) {
if self.state == UNCONNECTED {
self.handshake()
}
subscriptionParams := map[string]interface{}{"channel": "/meta/subscribe", "clientId": self.clientId, "subscription": channel, "id": "1"}
subscription := Subscription{channel: channel, callback: callback}
//TODO: deal with subscription failures
self.transport.send(subscriptionParams)

res, err := self.transport.send(subscriptionParams)
promise = SubscriptionPromise{subscription, nil}

if err != nil {
promise.subError = err
return
}

if !res.successful {
// TODO: put more information in the error message about why it failed
err = errors.New("Response was unsuccessful")
promise.subError = err
return
}

// don't add to the subscriptions until we know it succeeded
self.subscriptions = append(self.subscriptions, subscription)
return SubscriptionPromise{subscription}

return
}

func (self *FayeClient) handleResponse(response Response) {
Expand Down
62 changes: 51 additions & 11 deletions go_faye_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestSubscribe(t *testing.T) {
var fakeHttpTransport *FakeHttpTransport
var subscriptionParams map[string]interface{}
var response Response
var err error
Given(func() {
response = Response{id: "1", channel: "/meta/handshake", successful: true, clientId: "client4", supportedConnectionTypes: []string{"long-polling"}}
})
Expand All @@ -53,8 +54,9 @@ func TestSubscribe(t *testing.T) {
subscriptionParams = map[string]interface{}{"channel": "/meta/subscribe", "clientId": response.clientId, "subscription": "/foo/*", "id": "1"}
})
Given(func() { callback = func(message Message) {} })
When(func() { subscriptionPromise = fayeClient.Subscribe("/foo/*", false, callback) })
When(func() { subscriptionPromise, err = fayeClient.Subscribe("/foo/*", false, callback) })
Convey("connects the faye client", func() {
Then(func() { So(err, ShouldEqual, nil) })
Then(func() { So(fayeClient.state, ShouldEqual, CONNECTED) })
})
Convey("add the subscription to the client", func() {
Expand All @@ -74,6 +76,41 @@ func TestSubscribe(t *testing.T) {
})
}

func TestSubscriptionError(t *testing.T) {
Convey("subscribe to a channel when unconnected", t, func() {
var fayeClient FayeClient
var callback func(Message)
var subscriptionPromise SubscriptionPromise
var fakeHttpTransport *FakeHttpTransport
var subscriptionParams map[string]interface{}
var failedResponse Response
var clientId string = "client1"
var err error
Given(func() {
failedResponse = Response{id: "1", channel: "/meta/subscribe", successful: false, clientId: clientId, supportedConnectionTypes: []string{"long-polling"}}
})
Given(func() { fakeHttpTransport = &FakeHttpTransport{usable: true, response: failedResponse} })
Given(func() { registeredTransports = []Transport{fakeHttpTransport} })
Given(func() { fayeClient = BuildFayeClient().WithTransport(fakeHttpTransport).Client() })
Given(func() { fayeClient.state = CONNECTED })
Given(func() {
subscriptionParams = map[string]interface{}{"channel": "/meta/subscribe", "clientId": clientId, "subscription": "/foo/*", "id": "1"}
})
Given(func() { callback = func(message Message) {} })
When(func() { subscriptionPromise, err = fayeClient.Subscribe("/foo/*", false, callback) })
Convey("fails to subscribe", func() {
Then(func() { So(err, ShouldNotEqual, nil) })
Then(func() { So(subscriptionPromise.Successful(), ShouldEqual, false) })
})
Convey("not add the subscription to the client", func() {
Then(func() { So(len(fayeClient.subscriptions), ShouldEqual, 0) })
})
Convey("the client send the subscription to the server", func() {
Then(func() { So(fakeHttpTransport.sentParams, ShouldResemble, subscriptionParams) })
})
})
}

func TestPerformHandshake(t *testing.T) {
Convey("successful handshake with server", t, func() {
var fayeClient FayeClient
Expand Down Expand Up @@ -163,29 +200,32 @@ func TestHandleResponse(t *testing.T) {
var subscriptions []Subscription
var firstParams map[string]interface{}
var secondParams map[string]interface{}
var firstMessages []Message
var secondMessages []Message
var firstMessages []map[string]interface{}
var secondMessages []map[string]interface{}
Given(func() {
subscriptions = []Subscription{Subscription{"/foo/bar", func(message Message) { firstMessages = append(firstMessages, message) }},
Subscription{"/foo/*", func(message Message) { secondMessages = append(secondMessages, message) }},
subscriptions = []Subscription{
{"/foo/bar", func(message Message) { firstMessages = append(firstMessages, message.Data) }},
{"/foo/*", func(message Message) { secondMessages = append(secondMessages, message.Data) }},
}
})
Given(func() { firstParams = map[string]interface{}{"foo": "bar"} })
Given(func() { secondParams = map[string]interface{}{"baz": "qux"} })
Given(func() { fayeClient = BuildFayeClient().WithSubscriptions(subscriptions).Client() })
Given(func() {
messages = []Message{Message{Channel: "/foo/bar", Id: "1", Data: firstParams},
Message{Channel: "/foo/quz", Id: "1", Data: secondParams}}
messages = []Message{
{Channel: "/foo/bar", Id: "1", Data: firstParams},
{Channel: "/foo/quz", Id: "1", Data: secondParams},
}
})
Given(func() { response = Response{messages: messages, channel: "/meta/connect", clientId: "client1"} })
When(func() { fayeClient.handleResponse(response) })
//need a very short sleep in here to allow the go routines to complete
//as all they are doing is assigning a variable 10 milliseconds shoule be more than enough
Wait(10 * time.Millisecond)
Then(func() { So(firstMessages[0].Data, ShouldResemble, firstParams) })
Wait(100 * time.Millisecond)
Then(func() { So(firstMessages, ShouldContain, firstParams) })
Then(func() { So(len(secondMessages), ShouldEqual, 2) })
Then(func() { So(secondMessages[0].Data, ShouldResemble, firstParams) })
Then(func() { So(secondMessages[1].Data, ShouldResemble, secondParams) })
Then(func() { So(secondMessages, ShouldContain, firstParams) })
Then(func() { So(secondMessages, ShouldContain, secondParams) })
})
}

Expand Down
54 changes: 34 additions & 20 deletions response.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,75 @@
package wray

type Response struct {
id string
channel string
successful bool
clientId string
id string
channel string
successful bool
clientId string
supportedConnectionTypes []string
messages []Message
error error
messages []Message
error error
}

type Message struct {
Channel string
Id string
Data map[string]interface{}
Id string
Data map[string]interface{}
}

func newResponse(data []interface{}) Response {
headerData := data[0].(map[string]interface{})
messagesData := data[1.:]
messages := parseMessages(messagesData)

var id string
if headerData["id"] != nil {
id = headerData["id"].(string)
}

supportedConnectionTypes := []string{}

if headerData["supportedConnectionTypes"] != nil {
d := headerData["supportedConnectionTypes"].([]interface{})
for _, sct := range(d) {
for _, sct := range d {
supportedConnectionTypes = append(supportedConnectionTypes, sct.(string))
}
}

var clientId string
if headerData["clientId"] != nil {
clientId = headerData["clientId"].(string)
}
return Response{id: id,
clientId: clientId,
channel: headerData["channel"].(string),
successful: headerData["successful"].(bool),
messages: messages,
supportedConnectionTypes: supportedConnectionTypes}

return Response{
id: id,
clientId: clientId,
channel: headerData["channel"].(string),
successful: headerData["successful"].(bool),
messages: messages,
supportedConnectionTypes: supportedConnectionTypes,
}
}

func parseMessages(data []interface{}) []Message {
messages := []Message{}
for _, messageData := range(data) {

for _, messageData := range data {

m := messageData.(map[string]interface{})
var id string

if m["id"] != nil {
id = m["id"].(string)
}
message := Message{Channel: m["channel"].(string),
Id: id,
Data: m["data"].(map[string]interface{})}

message := Message{
Channel: m["channel"].(string),
Id: id,
Data: m["data"].(map[string]interface{}),
}

messages = append(messages, message)
}

return messages
}