connection.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package pubsub
  2. import (
  3. "net/url"
  4. "sync"
  5. "time"
  6. "github.com/gorilla/websocket"
  7. )
  8. // At least once every 5 minutes by docs but better keep this at 30 seconds
  9. //
  10. // https://dev.twitch.tv/docs/pubsub/#connection-management
  11. const TwitchApiPingEach = 15 * time.Second // 30 seconds
  12. const TwitchApiPingTimeout = 10 * time.Second // 10 seconds
  13. var nextConnectionID int64 = 0
  14. // Connection is represent of one connection.
  15. type Connection struct {
  16. sync.RWMutex
  17. done chan struct{}
  18. topics map[string]*struct{}
  19. active bool
  20. url url.URL
  21. ping_start time.Time
  22. ping_sended bool
  23. ID int64
  24. Connection *websocket.Conn
  25. // Events
  26. eventOnConnect func(*Connection)
  27. eventOnDisconnect func(*Connection)
  28. eventOnError func(*Connection, error)
  29. eventOnInfo func(*Connection, string)
  30. eventOnMessage func(*Connection, *Answer)
  31. eventOnPing func(*Connection, time.Time)
  32. eventOnPong func(*Connection, time.Time, time.Time)
  33. }
  34. // NewConnection create new connection.
  35. // Returns pointer to connection.
  36. func NewConnection(url url.URL) *Connection {
  37. c := &Connection{
  38. done: make(chan struct{}),
  39. topics: map[string]*struct{}{},
  40. active: false,
  41. url: url,
  42. ping_start: time.Now(),
  43. ping_sended: false,
  44. ID: nextConnectionID,
  45. Connection: nil,
  46. }
  47. go_reconnector(c)
  48. go_reader(c)
  49. go_pinger(c)
  50. nextConnectionID++
  51. return c
  52. }
  53. // -----------------------------------------------------------------------------
  54. func (c *Connection) onConnect() {
  55. if c.eventOnConnect != nil {
  56. c.eventOnConnect(c)
  57. }
  58. }
  59. func (c *Connection) onDisconnect() {
  60. if c.eventOnDisconnect != nil {
  61. c.eventOnDisconnect(c)
  62. }
  63. }
  64. func (c *Connection) onError(err error) {
  65. if c.eventOnError != nil {
  66. c.eventOnError(c, err)
  67. }
  68. }
  69. func (c *Connection) onInfo(str string) {
  70. if c.eventOnInfo != nil {
  71. c.eventOnInfo(c, str)
  72. }
  73. }
  74. func (c *Connection) onMessage(msg *Answer) {
  75. if c.eventOnMessage != nil {
  76. c.eventOnMessage(c, msg)
  77. }
  78. }
  79. func (c *Connection) onPing(start time.Time) {
  80. if c.eventOnPing != nil {
  81. c.eventOnPing(c, start)
  82. }
  83. }
  84. func (c *Connection) onPong(start, end time.Time) {
  85. if c.eventOnPong != nil {
  86. c.eventOnPong(c, start, end)
  87. }
  88. }
  89. // -----------------------------------------------------------------------------
  90. // listenTopis is generate topics and send request to API.
  91. // Also it can close connection because it's API limits.
  92. // Each connection must listen at least one topic.
  93. func (c *Connection) listenTopis() {
  94. topics := []string{}
  95. for topic := range c.topics {
  96. topics = append(topics, topic)
  97. }
  98. if len(topics) <= 0 {
  99. c.active = false
  100. if c.Connection != nil {
  101. c.Connection.Close()
  102. }
  103. return
  104. }
  105. // Send LISTEN request
  106. msg := Answer{Type: Listen, Data: AnswerDataTopics{Topics: topics}}.JSON()
  107. if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil {
  108. c.onError(err)
  109. c.active = false
  110. c.onDisconnect()
  111. }
  112. }
  113. // -----------------------------------------------------------------------------
  114. // AddTopic is adding topics for listening.
  115. func (c *Connection) AddTopic(topic string) {
  116. if _, ok := c.topics[topic]; ok {
  117. return
  118. }
  119. if len(c.topics) >= TwitchApiMaxTopics {
  120. return
  121. }
  122. c.Lock()
  123. defer c.Unlock()
  124. c.topics[topic] = nil
  125. c.listenTopis()
  126. }
  127. // RemoveTopic is remove topic from listening.
  128. func (c *Connection) RemoveTopic(topic string) {
  129. if _, ok := c.topics[topic]; !ok {
  130. return
  131. }
  132. c.Lock()
  133. defer c.Unlock()
  134. delete(c.topics, topic)
  135. c.listenTopis()
  136. }
  137. // RemoveAllTopics is remove all topics from listening.
  138. func (c *Connection) RemoveAllTopics() {
  139. c.Lock()
  140. defer c.Unlock()
  141. c.topics = map[string]*struct{}{}
  142. c.listenTopis()
  143. }
  144. // Topics returns all current listen topics.
  145. func (c *Connection) Topics() []string {
  146. topics := []string{}
  147. for topic := range c.topics {
  148. topics = append(topics, topic)
  149. }
  150. return topics
  151. }
  152. // Close is close connection and shutdown all goroutines.
  153. // Usually it's need to call before destroying.
  154. func (c *Connection) Close() error {
  155. c.active = false
  156. close(c.done)
  157. // It can be not initialized
  158. if c.Connection != nil {
  159. return c.Connection.Close()
  160. }
  161. return nil
  162. }
  163. // -----------------------------------------------------------------------------
  164. func (c *Connection) OnConnect(fn func(*Connection)) {
  165. c.eventOnConnect = fn
  166. }
  167. func (c *Connection) OnDisconnect(fn func(*Connection)) {
  168. c.eventOnDisconnect = fn
  169. }
  170. func (c *Connection) OnError(fn func(*Connection, error)) {
  171. c.eventOnError = fn
  172. }
  173. func (c *Connection) OnInfo(fn func(*Connection, string)) {
  174. c.eventOnInfo = fn
  175. }
  176. func (c *Connection) OnMessage(fn func(*Connection, *Answer)) {
  177. c.eventOnMessage = fn
  178. }
  179. func (c *Connection) OnPing(fn func(*Connection, time.Time)) {
  180. c.eventOnPing = fn
  181. }
  182. func (c *Connection) OnPong(fn func(*Connection, time.Time, time.Time)) {
  183. c.eventOnPong = fn
  184. }