go_reader.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package pubsub
  2. import (
  3. "encoding/json"
  4. "time"
  5. )
  6. func go_reader(c *Connection) {
  7. go func(c *Connection) {
  8. for {
  9. select {
  10. case <-c.done:
  11. return
  12. default:
  13. if c.active {
  14. _, msg, err := c.Connection.ReadMessage()
  15. if err != nil {
  16. c.onError(err)
  17. c.active = false
  18. c.onDisconnect()
  19. // Wait 1 second or return immediately
  20. select {
  21. case <-time.After(time.Second):
  22. case <-c.done:
  23. return
  24. }
  25. } else {
  26. var resp Response
  27. if err := json.Unmarshal(msg, &resp); err != nil {
  28. c.onError(err)
  29. } else {
  30. if resp.Type == "PONG" {
  31. ct := time.Now()
  32. c.onPong(c.ping_start, ct)
  33. c.ping_start = ct
  34. c.ping_sended = false
  35. } else if resp.Type == "RECONNECT" {
  36. c.onInfo("warning, got RECONNECT response")
  37. c.active = false
  38. c.onDisconnect()
  39. c.ping_start = time.Now()
  40. c.ping_sended = false
  41. if err := c.Connection.Close(); err != nil {
  42. c.onError(err)
  43. }
  44. } else if resp.Type == "RESPONSE" {
  45. // TODO: {"type":"RESPONSE","error":"","nonce":""}
  46. } else {
  47. c.onMessage(msg)
  48. }
  49. }
  50. }
  51. } else {
  52. // Wait 1 second or return immediately
  53. select {
  54. case <-time.After(time.Second):
  55. case <-c.done:
  56. return
  57. }
  58. }
  59. }
  60. }
  61. }(c)
  62. }