connection.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package pubsub
  2. import (
  3. "net/url"
  4. "sync"
  5. "time"
  6. "github.com/gorilla/websocket"
  7. )
  8. // At least once every 5 minutes by docs but better keep this at 30 seconds
  9. //
  10. // https://dev.twitch.tv/docs/pubsub/#connection-management
  11. const TwitchApiPingEach = 15 * time.Second // 30 seconds
  12. const TwitchApiPingTimeout = 10 * time.Second // 10 seconds
  13. var nextConnectionID int64 = 0
  14. // Connection is represent of one connection.
  15. type Connection struct {
  16. sync.RWMutex
  17. done chan struct{}
  18. topics map[string]*struct{}
  19. active bool
  20. url url.URL
  21. ping_start time.Time
  22. ping_sended bool
  23. ID int64
  24. Connection *websocket.Conn
  25. // Events
  26. eventOnConnect func(*Connection)
  27. eventOnDisconnect func(*Connection)
  28. eventOnError func(*Connection, error)
  29. eventOnInfo func(*Connection, string)
  30. eventOnMessage func(*Connection, *Answer)
  31. eventOnPing func(*Connection, time.Time)
  32. eventOnPong func(*Connection, time.Time, time.Time)
  33. }
  34. // NewConnection create new connection.
  35. // Returns pointer to connection.
  36. func NewConnection(url url.URL) *Connection {
  37. c := &Connection{
  38. done: make(chan struct{}),
  39. topics: map[string]*struct{}{},
  40. active: false,
  41. url: url,
  42. ping_start: time.Now(),
  43. ping_sended: false,
  44. ID: nextConnectionID,
  45. Connection: nil,
  46. }
  47. go_reconnector(c)
  48. go_reader(c)
  49. go_pinger(c)
  50. nextConnectionID++
  51. return c
  52. }
  53. // -----------------------------------------------------------------------------
  54. func (c *Connection) onConnect() {
  55. if c.eventOnConnect != nil {
  56. c.eventOnConnect(c)
  57. }
  58. }
  59. func (c *Connection) onDisconnect() {
  60. if c.eventOnDisconnect != nil {
  61. c.eventOnDisconnect(c)
  62. }
  63. }
  64. func (c *Connection) onError(err error) {
  65. if c.eventOnError != nil {
  66. c.eventOnError(c, err)
  67. }
  68. }
  69. func (c *Connection) onInfo(str string) {
  70. if c.eventOnInfo != nil {
  71. c.eventOnInfo(c, str)
  72. }
  73. }
  74. func (c *Connection) onMessage(msg *Answer) {
  75. if c.eventOnMessage != nil {
  76. c.eventOnMessage(c, msg)
  77. }
  78. }
  79. func (c *Connection) onPing(start time.Time) {
  80. if c.eventOnPing != nil {
  81. c.eventOnPing(c, start)
  82. }
  83. }
  84. func (c *Connection) onPong(start, end time.Time) {
  85. if c.eventOnPong != nil {
  86. c.eventOnPong(c, start, end)
  87. }
  88. }
  89. // -----------------------------------------------------------------------------
  90. // listenTopis is generate topics and send request to API.
  91. // Also it can close connection because it's API limits.
  92. // Each connection must listen at least one topic.
  93. func (c *Connection) listenTopis() {
  94. topics := []string{}
  95. for topic := range c.topics {
  96. topics = append(topics, topic)
  97. }
  98. // No topics, close connection
  99. if len(topics) <= 0 {
  100. c.active = false
  101. if c.Connection != nil {
  102. c.Connection.Close()
  103. }
  104. return
  105. }
  106. // Send LISTEN request
  107. msg := Answer{Type: Listen, Data: AnswerDataTopics{Topics: topics}}.JSON()
  108. if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil {
  109. c.onError(err)
  110. c.active = false
  111. c.onDisconnect()
  112. }
  113. }
  114. // -----------------------------------------------------------------------------
  115. // AddTopic is adding topics for listening.
  116. func (c *Connection) AddTopic(topic string) {
  117. if _, ok := c.topics[topic]; ok {
  118. return
  119. }
  120. if len(c.topics) >= TwitchApiMaxTopics {
  121. return
  122. }
  123. c.Lock()
  124. defer c.Unlock()
  125. c.topics[topic] = nil
  126. c.listenTopis()
  127. }
  128. // RemoveTopic is remove topic from listening.
  129. func (c *Connection) RemoveTopic(topic string) {
  130. if _, ok := c.topics[topic]; !ok {
  131. return
  132. }
  133. c.Lock()
  134. defer c.Unlock()
  135. delete(c.topics, topic)
  136. // Send UNLISTEN request
  137. msg := Answer{Type: Unlisten, Data: AnswerDataTopics{Topics: []string{topic}}}.JSON()
  138. if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil {
  139. c.onError(err)
  140. c.active = false
  141. c.onDisconnect()
  142. }
  143. // No topics, close connection
  144. if len(c.topics) <= 0 {
  145. c.active = false
  146. if c.Connection != nil {
  147. c.Connection.Close()
  148. }
  149. }
  150. }
  151. // RemoveAllTopics is remove all topics from listening.
  152. func (c *Connection) RemoveAllTopics() {
  153. c.Lock()
  154. defer c.Unlock()
  155. c.topics = map[string]*struct{}{}
  156. c.listenTopis()
  157. }
  158. // Topics returns all current listen topics.
  159. func (c *Connection) Topics() []string {
  160. topics := []string{}
  161. for topic := range c.topics {
  162. topics = append(topics, topic)
  163. }
  164. return topics
  165. }
  166. // Close is close connection and shutdown all goroutines.
  167. // Usually it's need to call before destroying.
  168. func (c *Connection) Close() error {
  169. c.active = false
  170. close(c.done)
  171. // It can be not initialized
  172. if c.Connection != nil {
  173. return c.Connection.Close()
  174. }
  175. return nil
  176. }
  177. // -----------------------------------------------------------------------------
  178. func (c *Connection) OnConnect(fn func(*Connection)) {
  179. c.eventOnConnect = fn
  180. }
  181. func (c *Connection) OnDisconnect(fn func(*Connection)) {
  182. c.eventOnDisconnect = fn
  183. }
  184. func (c *Connection) OnError(fn func(*Connection, error)) {
  185. c.eventOnError = fn
  186. }
  187. func (c *Connection) OnInfo(fn func(*Connection, string)) {
  188. c.eventOnInfo = fn
  189. }
  190. func (c *Connection) OnMessage(fn func(*Connection, *Answer)) {
  191. c.eventOnMessage = fn
  192. }
  193. func (c *Connection) OnPing(fn func(*Connection, time.Time)) {
  194. c.eventOnPing = fn
  195. }
  196. func (c *Connection) OnPong(fn func(*Connection, time.Time, time.Time)) {
  197. c.eventOnPong = fn
  198. }