go_reader.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package pubsub
  2. import (
  3. "encoding/json"
  4. "time"
  5. )
  6. func go_reader(c *Connection) {
  7. go func(c *Connection) {
  8. for {
  9. select {
  10. case <-c.done:
  11. return
  12. default:
  13. if c.active {
  14. _, msg, err := c.Connection.ReadMessage()
  15. if err != nil {
  16. c.onError(err)
  17. c.active = false
  18. c.onDisconnect()
  19. // Wait 1 second or return immediately
  20. select {
  21. case <-time.After(time.Second):
  22. case <-c.done:
  23. return
  24. }
  25. } else {
  26. var resp struct {
  27. Type string `json:"type"`
  28. }
  29. if err := json.Unmarshal(msg, &resp); err != nil {
  30. c.onError(err)
  31. } else {
  32. if resp.Type == "PONG" {
  33. ct := time.Now()
  34. c.onPong(c.ping_start, ct)
  35. c.ping_start = ct
  36. c.ping_sended = false
  37. } else if resp.Type == "RECONNECT" {
  38. c.onInfo("warning, got RECONNECT response")
  39. c.active = false
  40. c.onDisconnect()
  41. c.ping_start = time.Now()
  42. c.ping_sended = false
  43. if err := c.Connection.Close(); err != nil {
  44. c.onError(err)
  45. }
  46. } else {
  47. c.onMessage(msg)
  48. }
  49. }
  50. }
  51. } else {
  52. // Wait 1 second or return immediately
  53. select {
  54. case <-time.After(time.Second):
  55. case <-c.done:
  56. return
  57. }
  58. }
  59. }
  60. }
  61. }(c)
  62. }