connection.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package pubsub
  2. import (
  3. "net/url"
  4. "sync"
  5. "time"
  6. "github.com/gorilla/websocket"
  7. )
  8. // At least once every 5 minutes by docs but better keep this at 30 seconds
  9. //
  10. // https://dev.twitch.tv/docs/pubsub/#connection-management
  11. const TwitchApiPingEach = 15 * time.Second // 30 seconds
  12. const TwitchApiPingTimeout = 10 * time.Second // 10 seconds
  13. var nextConnectionID int64 = 0
  14. // Connection is represent of one connection.
  15. type Connection struct {
  16. sync.RWMutex
  17. done chan struct{}
  18. topics map[string]*struct{}
  19. active bool
  20. url url.URL
  21. ping_start time.Time
  22. ping_sended bool
  23. ID int64
  24. Connection *websocket.Conn
  25. // Events
  26. eventOnConnect func(*Connection)
  27. eventOnDisconnect func(*Connection)
  28. eventOnError func(*Connection, error)
  29. eventOnInfo func(*Connection, string)
  30. eventOnMessage func(*Connection, *Answer)
  31. eventOnPing func(*Connection, time.Time)
  32. eventOnPong func(*Connection, time.Time, time.Time)
  33. }
  34. // NewConnection create new connection.
  35. // Returns pointer to connection.
  36. func NewConnection(url url.URL) *Connection {
  37. c := &Connection{
  38. done: make(chan struct{}),
  39. topics: map[string]*struct{}{},
  40. active: false,
  41. url: url,
  42. ping_start: time.Now(),
  43. ping_sended: false,
  44. ID: nextConnectionID,
  45. Connection: nil,
  46. }
  47. go_reconnector(c)
  48. go_reader(c)
  49. go_pinger(c)
  50. nextConnectionID++
  51. return c
  52. }
  53. // -----------------------------------------------------------------------------
  54. func (c *Connection) onConnect() {
  55. if c.eventOnConnect != nil {
  56. c.eventOnConnect(c)
  57. }
  58. }
  59. func (c *Connection) onDisconnect() {
  60. if c.eventOnDisconnect != nil {
  61. c.eventOnDisconnect(c)
  62. }
  63. }
  64. func (c *Connection) onError(err error) {
  65. if c.eventOnError != nil {
  66. c.eventOnError(c, err)
  67. }
  68. }
  69. func (c *Connection) onInfo(str string) {
  70. if c.eventOnInfo != nil {
  71. c.eventOnInfo(c, str)
  72. }
  73. }
  74. func (c *Connection) onMessage(msg *Answer) {
  75. if c.eventOnMessage != nil {
  76. c.eventOnMessage(c, msg)
  77. }
  78. }
  79. func (c *Connection) onPing(start time.Time) {
  80. if c.eventOnPing != nil {
  81. c.eventOnPing(c, start)
  82. }
  83. }
  84. func (c *Connection) onPong(start, end time.Time) {
  85. if c.eventOnPong != nil {
  86. c.eventOnPong(c, start, end)
  87. }
  88. }
  89. // -----------------------------------------------------------------------------
  90. // listenTopis is generate topics and send request to API.
  91. // Also it can close connection because it's API limits.
  92. // Each connection must listen at least one topic.
  93. func (c *Connection) listenTopis() {
  94. topics := []string{}
  95. for topic := range c.topics {
  96. topics = append(topics, topic)
  97. }
  98. // No topics, close connection
  99. if len(topics) <= 0 {
  100. c.active = false
  101. if c.Connection != nil {
  102. c.Connection.Close()
  103. }
  104. return
  105. }
  106. // Send LISTEN request
  107. if c.Connection != nil && c.active {
  108. msg := Answer{Type: Listen, Data: AnswerDataTopics{Topics: topics}}.JSON()
  109. if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil {
  110. c.onError(err)
  111. c.active = false
  112. c.onDisconnect()
  113. }
  114. }
  115. }
  116. // -----------------------------------------------------------------------------
  117. // AddTopic is adding topics for listening.
  118. func (c *Connection) AddTopic(topic string) {
  119. if _, ok := c.topics[topic]; ok {
  120. return
  121. }
  122. if len(c.topics) >= TwitchApiMaxTopics {
  123. return
  124. }
  125. c.Lock()
  126. defer c.Unlock()
  127. c.topics[topic] = nil
  128. c.listenTopis()
  129. }
  130. // RemoveTopic is remove topic from listening.
  131. func (c *Connection) RemoveTopic(topic string) {
  132. if _, ok := c.topics[topic]; !ok {
  133. return
  134. }
  135. c.Lock()
  136. defer c.Unlock()
  137. delete(c.topics, topic)
  138. // Send UNLISTEN request
  139. if c.Connection != nil && c.active {
  140. msg := Answer{Type: Unlisten, Data: AnswerDataTopics{Topics: []string{topic}}}.JSON()
  141. if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil {
  142. c.onError(err)
  143. c.active = false
  144. c.onDisconnect()
  145. }
  146. }
  147. // No topics, close connection
  148. if len(c.topics) <= 0 {
  149. c.active = false
  150. if c.Connection != nil {
  151. c.Connection.Close()
  152. }
  153. }
  154. }
  155. // RemoveAllTopics is remove all topics from listening.
  156. func (c *Connection) RemoveAllTopics() {
  157. c.Lock()
  158. defer c.Unlock()
  159. c.topics = map[string]*struct{}{}
  160. c.listenTopis()
  161. }
  162. // Topics returns all current listen topics.
  163. func (c *Connection) Topics() []string {
  164. topics := []string{}
  165. for topic := range c.topics {
  166. topics = append(topics, topic)
  167. }
  168. return topics
  169. }
  170. // TopicsCount return count of topics.
  171. func (c *Connection) TopicsCount() int {
  172. return len(c.topics)
  173. }
  174. // Close is close connection and shutdown all goroutines.
  175. // Usually it's need to call before destroying.
  176. func (c *Connection) Close() error {
  177. c.active = false
  178. close(c.done)
  179. // It can be not initialized
  180. if c.Connection != nil {
  181. return c.Connection.Close()
  182. }
  183. return nil
  184. }
  185. // -----------------------------------------------------------------------------
  186. func (c *Connection) OnConnect(fn func(*Connection)) {
  187. c.eventOnConnect = fn
  188. }
  189. func (c *Connection) OnDisconnect(fn func(*Connection)) {
  190. c.eventOnDisconnect = fn
  191. }
  192. func (c *Connection) OnError(fn func(*Connection, error)) {
  193. c.eventOnError = fn
  194. }
  195. func (c *Connection) OnInfo(fn func(*Connection, string)) {
  196. c.eventOnInfo = fn
  197. }
  198. func (c *Connection) OnMessage(fn func(*Connection, *Answer)) {
  199. c.eventOnMessage = fn
  200. }
  201. func (c *Connection) OnPing(fn func(*Connection, time.Time)) {
  202. c.eventOnPing = fn
  203. }
  204. func (c *Connection) OnPong(fn func(*Connection, time.Time, time.Time)) {
  205. c.eventOnPong = fn
  206. }