pubsub.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. // Package implements Twitch API PubSub and automatically take care of API
  2. // limits. Also it will handle automatically reconnections, ping/pong and
  3. // maintenance requests.
  4. package pubsub
  5. import (
  6. "context"
  7. "fmt"
  8. "net/url"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. // Default Twitch server API credentials.
  14. //
  15. // https://dev.twitch.tv/docs/pubsub/#connection-management
  16. const TwitchApiScheme = "wss"
  17. const TwitchApiHost = "pubsub-edge.twitch.tv"
  18. const TwitchApiPath = ""
  19. const TwitchApiMaxTopics = 50
  20. // PubSub is represent of API client.
  21. type PubSub struct {
  22. sync.RWMutex
  23. URL url.URL
  24. Connections map[int64]*Connection
  25. // Events
  26. eventOnConnect func(*Connection)
  27. eventOnDisconnect func(*Connection)
  28. eventOnError func(*Connection, error)
  29. eventOnInfo func(*Connection, string)
  30. eventOnMessage func(*Connection, *Answer)
  31. eventOnPing func(*Connection, time.Time)
  32. eventOnPong func(*Connection, time.Time, time.Time)
  33. }
  34. // New create and returns new API client.
  35. func New() *PubSub {
  36. return NewWithURL(url.URL{
  37. Scheme: TwitchApiScheme,
  38. Host: TwitchApiHost,
  39. Path: TwitchApiPath,
  40. })
  41. }
  42. // NewWithURL create and returns new API client with custom API server URL.
  43. // It can be useful for testing.
  44. func NewWithURL(url url.URL) *PubSub {
  45. p := PubSub{
  46. URL: url,
  47. Connections: map[int64]*Connection{},
  48. }
  49. return &p
  50. }
  51. // -----------------------------------------------------------------------------
  52. func (p *PubSub) newConnection() *Connection {
  53. c := NewConnection(p.URL)
  54. c.OnConnect(p.eventOnConnect)
  55. c.OnDisconnect(p.eventOnDisconnect)
  56. c.OnError(p.eventOnError)
  57. c.OnInfo(p.eventOnInfo)
  58. c.OnMessage(p.eventOnMessage)
  59. c.OnPing(p.eventOnPing)
  60. c.OnPong(p.eventOnPong)
  61. return c
  62. }
  63. // -----------------------------------------------------------------------------
  64. // Listen is adding topics for listening. It take care of API limits.
  65. // New TCP connection will be created for every 50 topics.
  66. //
  67. // https://dev.twitch.tv/docs/pubsub/#connection-management
  68. func (p *PubSub) Listen(ctx context.Context, topic string, params ...interface{}) {
  69. p.Lock()
  70. defer p.Unlock()
  71. // Create and add first connection
  72. if len(p.Connections) <= 0 {
  73. c := p.newConnection()
  74. p.Connections[c.ID] = c
  75. }
  76. t := p.Topic(topic, params...)
  77. // Check topic in connection
  78. // Don't continue if already present
  79. for _, c := range p.Connections {
  80. select {
  81. case <-ctx.Done():
  82. return
  83. default:
  84. }
  85. if c.HasTopic(t) {
  86. return
  87. }
  88. }
  89. // Add topic to first not busy connection
  90. for _, c := range p.Connections {
  91. select {
  92. case <-ctx.Done():
  93. return
  94. default:
  95. }
  96. if c.TopicsCount() < TwitchApiMaxTopics {
  97. c.AddTopic(t)
  98. return
  99. }
  100. }
  101. // Create new one and add
  102. c := p.newConnection()
  103. p.Connections[c.ID] = c
  104. c.AddTopic(t)
  105. }
  106. // Unlisten is remove topics from listening. It take care of API limits too.
  107. // Connection count will automatically decrease of needs.
  108. //
  109. // https://dev.twitch.tv/docs/pubsub/#connection-management
  110. func (p *PubSub) Unlisten(ctx context.Context, topic string, params ...interface{}) {
  111. p.Lock()
  112. defer p.Unlock()
  113. t := p.Topic(topic, params...)
  114. // Search and unlisten
  115. for _, c := range p.Connections {
  116. select {
  117. case <-ctx.Done():
  118. return
  119. default:
  120. }
  121. if c.HasTopic(t) {
  122. c.RemoveTopic(t)
  123. // Must not contain duplicates
  124. // So just remove first and break
  125. break
  126. }
  127. }
  128. // Remove empty connections
  129. for i, c := range p.Connections {
  130. select {
  131. case <-ctx.Done():
  132. return
  133. default:
  134. }
  135. if c.TopicsCount() <= 0 {
  136. _ = c.Close()
  137. delete(p.Connections, i)
  138. // Can't be more than one connection without topics
  139. // So just close, remove first and break
  140. break
  141. }
  142. }
  143. }
  144. // Topics returns all current listen topics.
  145. func (p *PubSub) Topics() []string {
  146. p.Lock()
  147. defer p.Unlock()
  148. topics := []string{}
  149. for _, c := range p.Connections {
  150. for topic := range c.topics {
  151. topics = append(topics, topic)
  152. }
  153. }
  154. return topics
  155. }
  156. // HasTopic returns true if topic present.
  157. func (p *PubSub) HasTopic(topic string, params ...interface{}) bool {
  158. p.Lock()
  159. defer p.Unlock()
  160. t := p.Topic(topic, params...)
  161. for _, c := range p.Connections {
  162. if c.HasTopic(t) {
  163. return true
  164. }
  165. }
  166. return false
  167. }
  168. // TopicsCount return count of topics.
  169. func (p *PubSub) TopicsCount() int {
  170. p.Lock()
  171. defer p.Unlock()
  172. count := 0
  173. for _, c := range p.Connections {
  174. count += c.TopicsCount()
  175. }
  176. return count
  177. }
  178. // Topic generate correct topic for API.
  179. // Params can be as number or string.
  180. //
  181. // https://dev.twitch.tv/docs/pubsub/#topics
  182. func (p *PubSub) Topic(topic string, params ...interface{}) string {
  183. if len(params) <= 0 {
  184. return topic
  185. }
  186. var list []string
  187. for _, param := range params {
  188. list = append(list, fmt.Sprint(param))
  189. }
  190. return fmt.Sprintf("%s.%s", topic, strings.Join(list, "."))
  191. }
  192. // Close is close all connections.
  193. // Usually need to call at the end of app life.
  194. func (p *PubSub) Close() {
  195. p.Lock()
  196. defer p.Unlock()
  197. for i, c := range p.Connections {
  198. _ = c.Close()
  199. delete(p.Connections, i)
  200. }
  201. }
  202. // -----------------------------------------------------------------------------
  203. // OnConnect is bind func to event.
  204. // Will fire for every connection.
  205. func (c *PubSub) OnConnect(fn func(*Connection)) {
  206. c.eventOnConnect = fn
  207. }
  208. // OnDisconnect is bind func to event.
  209. // Will fire for every connection.
  210. func (c *PubSub) OnDisconnect(fn func(*Connection)) {
  211. c.eventOnDisconnect = fn
  212. }
  213. // OnError is bind func to event.
  214. // Will fire for every connection.
  215. func (c *PubSub) OnError(fn func(*Connection, error)) {
  216. c.eventOnError = fn
  217. }
  218. // OnInfo is bind func to event.
  219. // Will fire for every connection.
  220. func (c *PubSub) OnInfo(fn func(*Connection, string)) {
  221. c.eventOnInfo = fn
  222. }
  223. // OnMessage is bind func to event.
  224. // Will fire for every connection.
  225. func (c *PubSub) OnMessage(fn func(*Connection, *Answer)) {
  226. c.eventOnMessage = fn
  227. }
  228. // OnPing is bind func to event.
  229. // Will fire for every connection.
  230. func (c *PubSub) OnPing(fn func(*Connection, time.Time)) {
  231. c.eventOnPing = fn
  232. }
  233. // OnPong is bind func to event.
  234. // Will fire for every connection.
  235. func (c *PubSub) OnPong(fn func(*Connection, time.Time, time.Time)) {
  236. c.eventOnPong = fn
  237. }