connection.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package pubsub
  2. import (
  3. "net/url"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. )
  9. // At least once every 5 minutes by docs but better keep this at 30 seconds
  10. //
  11. // https://dev.twitch.tv/docs/pubsub/#connection-management
  12. const TwitchApiPingEach = 15 * time.Second // 30 seconds
  13. const TwitchApiPingTimeout = 10 * time.Second // 10 seconds
  14. var nextConnectionID int64 = 0
  15. // Connection is represent of one connection.
  16. type Connection struct {
  17. sync.RWMutex
  18. done chan struct{}
  19. topics map[string]*struct{}
  20. active bool
  21. url url.URL
  22. ping_start time.Time
  23. ping_sended bool
  24. ID int64
  25. Connection *websocket.Conn
  26. // Events
  27. eventOnConnect func(*Connection)
  28. eventOnDisconnect func(*Connection)
  29. eventOnError func(*Connection, error)
  30. eventOnInfo func(*Connection, string)
  31. eventOnMessage func(*Connection, []byte)
  32. eventOnPing func(*Connection, time.Time)
  33. eventOnPong func(*Connection, time.Time, time.Time)
  34. }
  35. // NewConnection create new connection.
  36. // Returns pointer to connection.
  37. func NewConnection(url url.URL) *Connection {
  38. c := &Connection{
  39. done: make(chan struct{}),
  40. topics: map[string]*struct{}{},
  41. active: false,
  42. url: url,
  43. ping_start: time.Now(),
  44. ping_sended: false,
  45. ID: nextConnectionID,
  46. Connection: nil,
  47. }
  48. go_reconnector(c)
  49. go_reader(c)
  50. go_pinger(c)
  51. nextConnectionID++
  52. return c
  53. }
  54. // -----------------------------------------------------------------------------
  55. func (c *Connection) onConnect() {
  56. if c.eventOnConnect != nil {
  57. c.eventOnConnect(c)
  58. }
  59. }
  60. func (c *Connection) onDisconnect() {
  61. if c.eventOnDisconnect != nil {
  62. c.eventOnDisconnect(c)
  63. }
  64. }
  65. func (c *Connection) onError(err error) {
  66. if c.eventOnError != nil {
  67. c.eventOnError(c, err)
  68. }
  69. }
  70. func (c *Connection) onInfo(str string) {
  71. if c.eventOnInfo != nil {
  72. c.eventOnInfo(c, str)
  73. }
  74. }
  75. func (c *Connection) onMessage(msg []byte) {
  76. if c.eventOnMessage != nil {
  77. c.eventOnMessage(c, msg)
  78. }
  79. }
  80. func (c *Connection) onPing(start time.Time) {
  81. if c.eventOnPing != nil {
  82. c.eventOnPing(c, start)
  83. }
  84. }
  85. func (c *Connection) onPong(start, end time.Time) {
  86. if c.eventOnPong != nil {
  87. c.eventOnPong(c, start, end)
  88. }
  89. }
  90. // -----------------------------------------------------------------------------
  91. func (c *Connection) listenTopis() {
  92. topics := []string{}
  93. for topic := range c.topics {
  94. topics = append(topics, topic)
  95. }
  96. msg := `{"type":"LISTEN","nonce":"","data":{"topics":["` + strings.Join(topics, `","`) + `"]}}`
  97. if err := c.Connection.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil {
  98. c.onError(err)
  99. c.active = false
  100. c.onDisconnect()
  101. }
  102. }
  103. // -----------------------------------------------------------------------------
  104. // AddTopic is adding topics for listening.
  105. func (c *Connection) AddTopic(topic string) {
  106. if _, ok := c.topics[topic]; ok {
  107. return
  108. }
  109. if len(c.topics) >= TwitchApiMaxTopics {
  110. return
  111. }
  112. c.Lock()
  113. defer c.Unlock()
  114. c.topics[topic] = nil
  115. // TODO: Send cmd...
  116. }
  117. // RemoveTopic is remove topics from listening.
  118. func (c *Connection) RemoveTopic(topic string) {
  119. if _, ok := c.topics[topic]; !ok {
  120. return
  121. }
  122. c.Lock()
  123. defer c.Unlock()
  124. delete(c.topics, topic)
  125. // TODO: Send cmd...
  126. }
  127. // Topics returns all current listen topics.
  128. func (c *Connection) Topics() []string {
  129. // TODO: ...
  130. return []string{}
  131. }
  132. // Close is close connection and shutdown all goroutines.
  133. // Usually it's need to call before destroying.
  134. func (c *Connection) Close() error {
  135. c.active = false
  136. close(c.done)
  137. // It can be not initialized
  138. if c.Connection != nil {
  139. return c.Connection.Close()
  140. }
  141. return nil
  142. }
  143. // -----------------------------------------------------------------------------
  144. func (c *Connection) OnConnect(fn func(*Connection)) {
  145. c.eventOnConnect = fn
  146. }
  147. func (c *Connection) OnDisconnect(fn func(*Connection)) {
  148. c.eventOnDisconnect = fn
  149. }
  150. func (c *Connection) OnError(fn func(*Connection, error)) {
  151. c.eventOnError = fn
  152. }
  153. func (c *Connection) OnInfo(fn func(*Connection, string)) {
  154. c.eventOnInfo = fn
  155. }
  156. func (c *Connection) OnMessage(fn func(*Connection, []byte)) {
  157. c.eventOnMessage = fn
  158. }
  159. func (c *Connection) OnPing(fn func(*Connection, time.Time)) {
  160. c.eventOnPing = fn
  161. }
  162. func (c *Connection) OnPong(fn func(*Connection, time.Time, time.Time)) {
  163. c.eventOnPong = fn
  164. }