go_reader.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package pubsub
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "time"
  6. )
  7. func go_reader(c *Connection) {
  8. go func(c *Connection) {
  9. for {
  10. select {
  11. case <-c.done:
  12. return
  13. default:
  14. if c.active {
  15. _, msg, err := c.Connection.ReadMessage()
  16. if err != nil {
  17. c.onError(err)
  18. c.active = false
  19. c.onDisconnect()
  20. // Wait 1 second or return immediately
  21. select {
  22. case <-time.After(time.Second):
  23. case <-c.done:
  24. return
  25. }
  26. } else {
  27. var answer Answer
  28. if err := json.Unmarshal(msg, &answer); err != nil {
  29. c.onError(err)
  30. } else {
  31. if answer.Type == Pong {
  32. ct := time.Now()
  33. c.onPong(c.ping_start, ct)
  34. c.ping_start = ct
  35. c.ping_sended = false
  36. } else if answer.Type == Reconnect {
  37. c.onInfo(fmt.Sprintf("warning, got %s response", Reconnect))
  38. c.active = false
  39. c.onDisconnect()
  40. c.ping_start = time.Now()
  41. c.ping_sended = false
  42. if err := c.Connection.Close(); err != nil {
  43. c.onError(err)
  44. }
  45. } else if answer.Type == Response {
  46. // TODO: {"type":"RESPONSE","error":"","nonce":""}
  47. } else {
  48. c.onMessage(msg)
  49. }
  50. }
  51. }
  52. } else {
  53. // Wait 1 second or return immediately
  54. select {
  55. case <-time.After(time.Second):
  56. case <-c.done:
  57. return
  58. }
  59. }
  60. }
  61. }
  62. }(c)
  63. }