go_reader.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package pubsub
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "time"
  6. )
  7. func go_reader(c *Connection) {
  8. go func(c *Connection) {
  9. for {
  10. select {
  11. case <-c.done:
  12. return
  13. default:
  14. if c.active {
  15. _, msg, err := c.Connection.ReadMessage()
  16. if err != nil {
  17. c.onError(err)
  18. c.active = false
  19. c.onDisconnect()
  20. // Wait 1 second or return immediately
  21. select {
  22. case <-time.After(time.Second):
  23. case <-c.done:
  24. return
  25. }
  26. } else {
  27. var answer Answer
  28. if err := json.Unmarshal(msg, &answer); err != nil {
  29. c.onError(err)
  30. } else {
  31. if answer.Type == Pong {
  32. ct := time.Now()
  33. c.onPong(c.ping_start, ct)
  34. c.ping_start = ct
  35. c.ping_sended = false
  36. } else if answer.Type == Reconnect {
  37. c.onInfo(fmt.Sprintf("warning, got %s response", Reconnect))
  38. c.active = false
  39. c.onDisconnect()
  40. c.ping_start = time.Now()
  41. c.ping_sended = false
  42. if err := c.Connection.Close(); err != nil {
  43. c.onError(err)
  44. }
  45. } else if answer.Type == Response {
  46. if answer.HasError() {
  47. c.onError(fmt.Errorf(answer.Error))
  48. } else {
  49. c.onInfo(fmt.Sprintf("type: %s, data: %#v", answer.Type, answer.Data))
  50. }
  51. } else {
  52. (&answer).Parse()
  53. c.onMessage(&answer)
  54. }
  55. }
  56. }
  57. } else {
  58. // Wait 1 second or return immediately
  59. select {
  60. case <-time.After(time.Second):
  61. case <-c.done:
  62. return
  63. }
  64. }
  65. }
  66. }
  67. }(c)
  68. }