go_reader.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package pubsub
  2. import (
  3. "encoding/json"
  4. "time"
  5. )
  6. func go_reader(c *Connection) {
  7. go func(c *Connection) {
  8. for {
  9. select {
  10. case <-c.done:
  11. return
  12. default:
  13. if c.active {
  14. _, msg, err := c.Connection.ReadMessage()
  15. if err != nil {
  16. c.onError(err)
  17. c.active = false
  18. c.onDisconnect()
  19. // Wait 1 second or return immediately
  20. select {
  21. case <-time.After(time.Second):
  22. case <-c.done:
  23. return
  24. }
  25. } else {
  26. var resp struct {
  27. Type string `json:"type"`
  28. }
  29. if err := json.Unmarshal(msg, &resp); err != nil {
  30. c.onError(err)
  31. } else {
  32. if resp.Type == "PONG" {
  33. ct := time.Now()
  34. c.onPong(c.ping_start, ct)
  35. c.ping_start = ct
  36. c.ping_sended = false
  37. } else if resp.Type == "RECONNECT" {
  38. c.onInfo("warning, got RECONNECT response")
  39. c.active = false
  40. c.onDisconnect()
  41. c.ping_start = time.Now()
  42. c.ping_sended = false
  43. if err := c.Connection.Close(); err != nil {
  44. c.onError(err)
  45. }
  46. } else if resp.Type == "RESPONSE" {
  47. // TODO: {"type":"RESPONSE","error":"","nonce":""}
  48. } else {
  49. c.onMessage(msg)
  50. }
  51. }
  52. }
  53. } else {
  54. // Wait 1 second or return immediately
  55. select {
  56. case <-time.After(time.Second):
  57. case <-c.done:
  58. return
  59. }
  60. }
  61. }
  62. }
  63. }(c)
  64. }