pubsub.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. // Package implements Twitch API PubSub and automatically take care of API
  2. // limits. Also it will handle automatically reconnections, ping/pong and
  3. // maintenance requests.
  4. package pubsub
  5. import (
  6. "fmt"
  7. "net/url"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. // Default Twitch server API credentials.
  13. //
  14. // https://dev.twitch.tv/docs/pubsub/#connection-management
  15. const TwitchApiScheme = "wss"
  16. const TwitchApiHost = "pubsub-edge.twitch.tv"
  17. const TwitchApiPath = ""
  18. const TwitchApiMaxTopics = 50
  19. // PubSub is represent of API client.
  20. type PubSub struct {
  21. sync.RWMutex
  22. URL url.URL
  23. Connections map[int64]*Connection
  24. // Events
  25. eventOnConnect func(*Connection)
  26. eventOnDisconnect func(*Connection)
  27. eventOnError func(*Connection, error)
  28. eventOnInfo func(*Connection, string)
  29. eventOnMessage func(*Connection, *Answer)
  30. eventOnPing func(*Connection, time.Time)
  31. eventOnPong func(*Connection, time.Time, time.Time)
  32. }
  33. // New create and returns new API client.
  34. func New() *PubSub {
  35. return NewWithURL(url.URL{
  36. Scheme: TwitchApiScheme,
  37. Host: TwitchApiHost,
  38. Path: TwitchApiPath,
  39. })
  40. }
  41. // NewWithURL create and returns new API client with custom API server URL.
  42. // It can be useful for testing.
  43. func NewWithURL(url url.URL) *PubSub {
  44. p := PubSub{
  45. URL: url,
  46. Connections: map[int64]*Connection{},
  47. }
  48. return &p
  49. }
  50. // -----------------------------------------------------------------------------
  51. func (p *PubSub) newConnection() *Connection {
  52. c := NewConnection(p.URL)
  53. c.OnConnect(p.eventOnConnect)
  54. c.OnDisconnect(p.eventOnDisconnect)
  55. c.OnError(p.eventOnError)
  56. c.OnInfo(p.eventOnInfo)
  57. c.OnMessage(p.eventOnMessage)
  58. c.OnPing(p.eventOnPing)
  59. c.OnPong(p.eventOnPong)
  60. return c
  61. }
  62. // -----------------------------------------------------------------------------
  63. // Listen is adding topics for listening. It take care of API limits.
  64. // New TCP connection will be created for every 50 topics.
  65. //
  66. // https://dev.twitch.tv/docs/pubsub/#connection-management
  67. func (p *PubSub) Listen(topic string, params ...interface{}) {
  68. p.Lock()
  69. defer p.Unlock()
  70. // Create and add first connection
  71. if len(p.Connections) <= 0 {
  72. c := p.newConnection()
  73. p.Connections[c.ID] = c
  74. }
  75. t := p.Topic(topic, params...)
  76. // Check topic in connection
  77. // Don't continue if already present
  78. for _, c := range p.Connections {
  79. if c.HasTopic(t) {
  80. return
  81. }
  82. }
  83. // Add topic to first not busy connection
  84. for _, c := range p.Connections {
  85. if c.TopicsCount() < TwitchApiMaxTopics {
  86. c.AddTopic(t)
  87. return
  88. }
  89. }
  90. // Create new one and add
  91. c := p.newConnection()
  92. p.Connections[c.ID] = c
  93. c.AddTopic(t)
  94. }
  95. // Unlisten is remove topics from listening. It take care of API limits too.
  96. // Connection count will automatically decrease of needs.
  97. //
  98. // https://dev.twitch.tv/docs/pubsub/#connection-management
  99. func (p *PubSub) Unlisten(topic string, params ...interface{}) {
  100. p.Lock()
  101. defer p.Unlock()
  102. t := p.Topic(topic, params...)
  103. // Search and unlisten
  104. for _, c := range p.Connections {
  105. if c.HasTopic(t) {
  106. c.RemoveTopic(t)
  107. // Must not contain duplicates
  108. // So just remove first and break
  109. break
  110. }
  111. }
  112. // Remove empty connections
  113. for i, c := range p.Connections {
  114. if c.TopicsCount() <= 0 {
  115. _ = c.Close()
  116. delete(p.Connections, i)
  117. // Can't be more than one connection without topics
  118. // So just close, remove first and break
  119. break
  120. }
  121. }
  122. }
  123. // Topics returns all current listen topics.
  124. func (p *PubSub) Topics() []string {
  125. p.Lock()
  126. defer p.Unlock()
  127. topics := []string{}
  128. for _, c := range p.Connections {
  129. for topic := range c.topics {
  130. topics = append(topics, topic)
  131. }
  132. }
  133. return topics
  134. }
  135. // HasTopic returns true if topic present.
  136. func (p *PubSub) HasTopic(topic string, params ...interface{}) bool {
  137. p.Lock()
  138. defer p.Unlock()
  139. t := p.Topic(topic, params...)
  140. for _, c := range p.Connections {
  141. if c.HasTopic(t) {
  142. return true
  143. }
  144. }
  145. return false
  146. }
  147. // TopicsCount return count of topics.
  148. func (p *PubSub) TopicsCount() int {
  149. p.Lock()
  150. defer p.Unlock()
  151. count := 0
  152. for _, c := range p.Connections {
  153. count += c.TopicsCount()
  154. }
  155. return count
  156. }
  157. // Topic generate correct topic for API.
  158. // Params can be as number or string.
  159. //
  160. // https://dev.twitch.tv/docs/pubsub/#topics
  161. func (p *PubSub) Topic(topic string, params ...interface{}) string {
  162. if len(params) <= 0 {
  163. return topic
  164. }
  165. var list []string
  166. for _, param := range params {
  167. list = append(list, fmt.Sprint(param))
  168. }
  169. return fmt.Sprintf("%s.%s", topic, strings.Join(list, "."))
  170. }
  171. // Close is close all connections.
  172. // Usually need to call at the end of app life.
  173. func (p *PubSub) Close() {
  174. p.Lock()
  175. defer p.Unlock()
  176. for i, c := range p.Connections {
  177. _ = c.Close()
  178. delete(p.Connections, i)
  179. }
  180. }
  181. // -----------------------------------------------------------------------------
  182. // OnConnect is bind func to event.
  183. // Will fire for every connection.
  184. func (c *PubSub) OnConnect(fn func(*Connection)) {
  185. c.eventOnConnect = fn
  186. }
  187. // OnDisconnect is bind func to event.
  188. // Will fire for every connection.
  189. func (c *PubSub) OnDisconnect(fn func(*Connection)) {
  190. c.eventOnDisconnect = fn
  191. }
  192. // OnError is bind func to event.
  193. // Will fire for every connection.
  194. func (c *PubSub) OnError(fn func(*Connection, error)) {
  195. c.eventOnError = fn
  196. }
  197. // OnInfo is bind func to event.
  198. // Will fire for every connection.
  199. func (c *PubSub) OnInfo(fn func(*Connection, string)) {
  200. c.eventOnInfo = fn
  201. }
  202. // OnMessage is bind func to event.
  203. // Will fire for every connection.
  204. func (c *PubSub) OnMessage(fn func(*Connection, *Answer)) {
  205. c.eventOnMessage = fn
  206. }
  207. // OnPing is bind func to event.
  208. // Will fire for every connection.
  209. func (c *PubSub) OnPing(fn func(*Connection, time.Time)) {
  210. c.eventOnPing = fn
  211. }
  212. // OnPong is bind func to event.
  213. // Will fire for every connection.
  214. func (c *PubSub) OnPong(fn func(*Connection, time.Time, time.Time)) {
  215. c.eventOnPong = fn
  216. }