connection.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package pubsub
  2. import (
  3. "net/url"
  4. "sync"
  5. "time"
  6. "github.com/gorilla/websocket"
  7. )
  8. // At least once every 5 minutes by docs but better keep this at 30 seconds
  9. //
  10. // https://dev.twitch.tv/docs/pubsub/#connection-management
  11. const TwitchApiPingEach = 15 * time.Second // 30 seconds
  12. const TwitchApiPingTimeout = 10 * time.Second // 10 seconds
  13. var nextConnectionID int64 = 0
  14. // Connection is represent of one connection.
  15. type Connection struct {
  16. sync.RWMutex
  17. done chan struct{}
  18. topics map[string]*struct{}
  19. active bool
  20. url url.URL
  21. ping_start time.Time
  22. ping_sended bool
  23. ID int64
  24. Connection *websocket.Conn
  25. // Events
  26. eventOnConnect func(*Connection)
  27. eventOnDisconnect func(*Connection)
  28. eventOnError func(*Connection, error)
  29. eventOnInfo func(*Connection, string)
  30. eventOnMessage func(*Connection, *Answer)
  31. eventOnPing func(*Connection, time.Time)
  32. eventOnPong func(*Connection, time.Time, time.Time)
  33. }
  34. // NewConnection create new connection.
  35. // Returns pointer to connection.
  36. func NewConnection(url url.URL) *Connection {
  37. c := &Connection{
  38. done: make(chan struct{}),
  39. topics: map[string]*struct{}{},
  40. active: false,
  41. url: url,
  42. ping_start: time.Now(),
  43. ping_sended: false,
  44. ID: nextConnectionID,
  45. Connection: nil,
  46. }
  47. go_reconnector(c)
  48. go_reader(c)
  49. go_pinger(c)
  50. nextConnectionID++
  51. return c
  52. }
  53. // -----------------------------------------------------------------------------
  54. func (c *Connection) onConnect() {
  55. if c.eventOnConnect != nil {
  56. c.eventOnConnect(c)
  57. }
  58. }
  59. func (c *Connection) onDisconnect() {
  60. if c.eventOnDisconnect != nil {
  61. c.eventOnDisconnect(c)
  62. }
  63. }
  64. func (c *Connection) onError(err error) {
  65. if c.eventOnError != nil {
  66. c.eventOnError(c, err)
  67. }
  68. }
  69. func (c *Connection) onInfo(str string) {
  70. if c.eventOnInfo != nil {
  71. c.eventOnInfo(c, str)
  72. }
  73. }
  74. func (c *Connection) onMessage(msg *Answer) {
  75. if c.eventOnMessage != nil {
  76. c.eventOnMessage(c, msg)
  77. }
  78. }
  79. func (c *Connection) onPing(start time.Time) {
  80. if c.eventOnPing != nil {
  81. c.eventOnPing(c, start)
  82. }
  83. }
  84. func (c *Connection) onPong(start, end time.Time) {
  85. if c.eventOnPong != nil {
  86. c.eventOnPong(c, start, end)
  87. }
  88. }
  89. // -----------------------------------------------------------------------------
  90. // listenTopis is generate topics and send request to API.
  91. // Also it can close connection because it's API limits.
  92. // Each connection must listen at least one topic.
  93. func (c *Connection) listenTopis() {
  94. topics := []string{}
  95. for topic := range c.topics {
  96. topics = append(topics, topic)
  97. }
  98. // No topics, close connection
  99. if len(topics) <= 0 {
  100. c.active = false
  101. if c.Connection != nil {
  102. c.Connection.Close()
  103. }
  104. return
  105. }
  106. // Send LISTEN request
  107. if c.Connection != nil && c.active {
  108. // TODO: track bad topics and auto remove? [FUTURE]
  109. // One bad topic will break all next topics
  110. // The error message associated with the request, or an empty string if there is no error.
  111. // For Bits and whispers events requests, error responses can be:
  112. // ERR_BADMESSAGE, ERR_BADAUTH, ERR_SERVER, ERR_BADTOPIC
  113. msg := Answer{Type: Listen, Data: AnswerDataTopics{Topics: topics}}.JSON()
  114. if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil {
  115. c.onError(err)
  116. c.active = false
  117. c.onDisconnect()
  118. }
  119. }
  120. }
  121. // -----------------------------------------------------------------------------
  122. // AddTopic is adding topics for listening.
  123. func (c *Connection) AddTopic(topic string) {
  124. if _, ok := c.topics[topic]; ok {
  125. return
  126. }
  127. if len(c.topics) >= TwitchApiMaxTopics {
  128. return
  129. }
  130. c.Lock()
  131. defer c.Unlock()
  132. c.topics[topic] = nil
  133. c.listenTopis()
  134. }
  135. // RemoveTopic is remove topic from listening.
  136. func (c *Connection) RemoveTopic(topic string) {
  137. if _, ok := c.topics[topic]; !ok {
  138. return
  139. }
  140. c.Lock()
  141. defer c.Unlock()
  142. delete(c.topics, topic)
  143. // Send UNLISTEN request
  144. if c.Connection != nil && c.active {
  145. msg := Answer{Type: Unlisten, Data: AnswerDataTopics{Topics: []string{topic}}}.JSON()
  146. if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil {
  147. c.onError(err)
  148. c.active = false
  149. c.onDisconnect()
  150. }
  151. }
  152. // No topics, close connection
  153. if len(c.topics) <= 0 {
  154. c.active = false
  155. if c.Connection != nil {
  156. c.Connection.Close()
  157. }
  158. }
  159. }
  160. // RemoveAllTopics is remove all topics from listening.
  161. func (c *Connection) RemoveAllTopics() {
  162. c.Lock()
  163. defer c.Unlock()
  164. c.topics = map[string]*struct{}{}
  165. c.listenTopis()
  166. }
  167. // Topics returns all current listen topics.
  168. func (c *Connection) Topics() []string {
  169. c.Lock()
  170. defer c.Unlock()
  171. topics := []string{}
  172. for topic := range c.topics {
  173. topics = append(topics, topic)
  174. }
  175. return topics
  176. }
  177. // HasTopic returns true if topic present.
  178. func (c *Connection) HasTopic(topic string) bool {
  179. if _, ok := c.topics[topic]; ok {
  180. return true
  181. }
  182. return false
  183. }
  184. // TopicsCount return count of topics.
  185. func (c *Connection) TopicsCount() int {
  186. return len(c.topics)
  187. }
  188. // Close is close connection and shutdown all goroutines.
  189. // Usually it's need to call before destroying.
  190. func (c *Connection) Close() error {
  191. c.active = false
  192. close(c.done)
  193. // It can be not initialized
  194. if c.Connection != nil {
  195. return c.Connection.Close()
  196. }
  197. return nil
  198. }
  199. // -----------------------------------------------------------------------------
  200. func (c *Connection) OnConnect(fn func(*Connection)) {
  201. c.eventOnConnect = fn
  202. }
  203. func (c *Connection) OnDisconnect(fn func(*Connection)) {
  204. c.eventOnDisconnect = fn
  205. }
  206. func (c *Connection) OnError(fn func(*Connection, error)) {
  207. c.eventOnError = fn
  208. }
  209. func (c *Connection) OnInfo(fn func(*Connection, string)) {
  210. c.eventOnInfo = fn
  211. }
  212. func (c *Connection) OnMessage(fn func(*Connection, *Answer)) {
  213. c.eventOnMessage = fn
  214. }
  215. func (c *Connection) OnPing(fn func(*Connection, time.Time)) {
  216. c.eventOnPing = fn
  217. }
  218. func (c *Connection) OnPong(fn func(*Connection, time.Time, time.Time)) {
  219. c.eventOnPong = fn
  220. }