go_reader.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package pubsub
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "time"
  6. )
  7. func go_reader(c *Connection) {
  8. go func(c *Connection) {
  9. for {
  10. select {
  11. case <-c.done:
  12. return
  13. default:
  14. if c.active {
  15. _, msg, err := c.Connection.ReadMessage()
  16. if err != nil {
  17. c.onError(err)
  18. c.active = false
  19. c.onDisconnect()
  20. // Wait 1 second or return immediately
  21. select {
  22. case <-time.After(time.Second):
  23. case <-c.done:
  24. return
  25. }
  26. } else {
  27. var answer Answer
  28. if err := json.Unmarshal(msg, &answer); err != nil {
  29. c.onError(err)
  30. } else {
  31. if answer.Type == Pong {
  32. ct := time.Now()
  33. c.onPong(c.ping_start, ct)
  34. c.ping_start = ct
  35. c.ping_sended = false
  36. } else if answer.Type == Reconnect {
  37. c.onInfo(fmt.Sprintf("warning, got %s response", Reconnect))
  38. c.active = false
  39. c.onDisconnect()
  40. c.ping_start = time.Now()
  41. c.ping_sended = false
  42. if err := c.Connection.Close(); err != nil {
  43. c.onError(err)
  44. }
  45. } else if answer.Type == Response {
  46. if answer.HasError() {
  47. c.onError(fmt.Errorf(answer.Error))
  48. } else {
  49. c.onInfo(fmt.Sprintf("type: %s, data: %#v", answer.Type, answer.Data))
  50. }
  51. } else {
  52. c.onMessage(&answer)
  53. }
  54. }
  55. }
  56. } else {
  57. // Wait 1 second or return immediately
  58. select {
  59. case <-time.After(time.Second):
  60. case <-c.done:
  61. return
  62. }
  63. }
  64. }
  65. }
  66. }(c)
  67. }