pubsub.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. // Package implements Twitch API PubSub and automatically take care of API
  2. // limits. Also it will handle automatically reconnections, ping/pong and
  3. // maintenance requests.
  4. package pubsub
  5. import (
  6. "fmt"
  7. "net/url"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. // Default Twitch server API credentials.
  13. //
  14. // https://dev.twitch.tv/docs/pubsub/#connection-management
  15. const TwitchApiScheme = "wss"
  16. const TwitchApiHost = "pubsub-edge.twitch.tv"
  17. const TwitchApiPath = ""
  18. const TwitchApiMaxTopics = 50
  19. // PubSub is represent of API client.
  20. type PubSub struct {
  21. sync.RWMutex
  22. URL url.URL
  23. Connections map[int64]*Connection
  24. // Events
  25. eventOnConnect func(*Connection)
  26. eventOnDisconnect func(*Connection)
  27. eventOnError func(*Connection, error)
  28. eventOnInfo func(*Connection, string)
  29. eventOnMessage func(*Connection, *Answer)
  30. eventOnPing func(*Connection, time.Time)
  31. eventOnPong func(*Connection, time.Time, time.Time)
  32. }
  33. // New create and returns new API client.
  34. func New() *PubSub {
  35. return NewWithURL(url.URL{
  36. Scheme: TwitchApiScheme,
  37. Host: TwitchApiHost,
  38. Path: TwitchApiPath,
  39. })
  40. }
  41. // NewWithURL create and returns new API client with custom API server URL.
  42. // It can be useful for testing.
  43. func NewWithURL(url url.URL) *PubSub {
  44. p := PubSub{
  45. URL: url,
  46. Connections: map[int64]*Connection{},
  47. }
  48. return &p
  49. }
  50. // -----------------------------------------------------------------------------
  51. func (p *PubSub) newConnection() *Connection {
  52. c := NewConnection(p.URL)
  53. c.OnConnect(p.eventOnConnect)
  54. c.OnDisconnect(p.eventOnDisconnect)
  55. c.OnError(p.eventOnError)
  56. c.OnInfo(p.eventOnInfo)
  57. c.OnMessage(p.eventOnMessage)
  58. c.OnPing(p.eventOnPing)
  59. c.OnPong(p.eventOnPong)
  60. return c
  61. }
  62. // -----------------------------------------------------------------------------
  63. // Listen is adding topics for listening. It take care of API limits.
  64. // New TCP connection will be created for every 50 topics.
  65. //
  66. // https://dev.twitch.tv/docs/pubsub/#connection-management
  67. func (p *PubSub) Listen(topic string, params ...interface{}) {
  68. p.Lock()
  69. defer p.Unlock()
  70. // Create and add first connection
  71. if len(p.Connections) <= 0 {
  72. c := p.newConnection()
  73. p.Connections[c.ID] = c
  74. }
  75. t := p.Topic(topic, params...)
  76. // Check topic in connection
  77. // Don't continue if already present
  78. for _, c := range p.Connections {
  79. if c.HasTopic(t) {
  80. return
  81. }
  82. }
  83. // Add topic to first not busy connection
  84. for _, c := range p.Connections {
  85. if c.TopicsCount() < TwitchApiMaxTopics {
  86. c.AddTopic(t)
  87. return
  88. }
  89. }
  90. // Create new one and add
  91. c := p.newConnection()
  92. p.Connections[c.ID] = c
  93. c.AddTopic(t)
  94. }
  95. // Unlisten is remove topics from listening. It take care of API limits too.
  96. // Connection count will automatically decrease of needs.
  97. //
  98. // https://dev.twitch.tv/docs/pubsub/#connection-management
  99. func (p *PubSub) Unlisten(topic string, params ...interface{}) {
  100. p.Lock()
  101. defer p.Unlock()
  102. t := p.Topic(topic, params...)
  103. // Search and unlisten
  104. for _, c := range p.Connections {
  105. if c.HasTopic(t) {
  106. c.RemoveTopic(t)
  107. // Must not contain duplicates
  108. // So just remove first and break
  109. break
  110. }
  111. }
  112. // Remove empty connections
  113. for i, c := range p.Connections {
  114. if c.TopicsCount() <= 0 {
  115. _ = c.Close()
  116. delete(p.Connections, i)
  117. // Can't be more than one connection without topics
  118. // So just close, remove first and break
  119. break
  120. }
  121. }
  122. }
  123. // HasTopic returns true if topic present.
  124. func (p *PubSub) HasTopic(topic string, params ...interface{}) bool {
  125. p.Lock()
  126. defer p.Unlock()
  127. t := p.Topic(topic, params...)
  128. for _, c := range p.Connections {
  129. if c.HasTopic(t) {
  130. return true
  131. }
  132. }
  133. return false
  134. }
  135. // Topic generate correct topic for API.
  136. // Params can be as number or string.
  137. //
  138. // https://dev.twitch.tv/docs/pubsub/#topics
  139. func (p *PubSub) Topic(topic string, params ...interface{}) string {
  140. if len(params) <= 0 {
  141. return topic
  142. }
  143. var list []string
  144. for _, param := range params {
  145. list = append(list, fmt.Sprint(param))
  146. }
  147. return fmt.Sprintf("%s.%s", topic, strings.Join(list, "."))
  148. }
  149. // Close is close all connections.
  150. // Usually need to call at the end of app life.
  151. func (p *PubSub) Close() {
  152. p.Lock()
  153. defer p.Unlock()
  154. for i, c := range p.Connections {
  155. _ = c.Close()
  156. delete(p.Connections, i)
  157. }
  158. }
  159. // -----------------------------------------------------------------------------
  160. // OnConnect is bind func to event.
  161. // Will fire for every connection.
  162. func (c *PubSub) OnConnect(fn func(*Connection)) {
  163. c.eventOnConnect = fn
  164. }
  165. // OnDisconnect is bind func to event.
  166. // Will fire for every connection.
  167. func (c *PubSub) OnDisconnect(fn func(*Connection)) {
  168. c.eventOnDisconnect = fn
  169. }
  170. // OnError is bind func to event.
  171. // Will fire for every connection.
  172. func (c *PubSub) OnError(fn func(*Connection, error)) {
  173. c.eventOnError = fn
  174. }
  175. // OnInfo is bind func to event.
  176. // Will fire for every connection.
  177. func (c *PubSub) OnInfo(fn func(*Connection, string)) {
  178. c.eventOnInfo = fn
  179. }
  180. // OnMessage is bind func to event.
  181. // Will fire for every connection.
  182. func (c *PubSub) OnMessage(fn func(*Connection, *Answer)) {
  183. c.eventOnMessage = fn
  184. }
  185. // OnPing is bind func to event.
  186. // Will fire for every connection.
  187. func (c *PubSub) OnPing(fn func(*Connection, time.Time)) {
  188. c.eventOnPing = fn
  189. }
  190. // OnPong is bind func to event.
  191. // Will fire for every connection.
  192. func (c *PubSub) OnPong(fn func(*Connection, time.Time, time.Time)) {
  193. c.eventOnPong = fn
  194. }