pubsub.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // Package implements Twitch API PubSub and automatically take care of API
  2. // limits. Also it will handle automatically reconnections, ping/pong and
  3. // maintenance requests.
  4. package pubsub
  5. import (
  6. "fmt"
  7. "net/url"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. // Default Twitch server API credentials.
  13. //
  14. // https://dev.twitch.tv/docs/pubsub/#connection-management
  15. const TwitchApiScheme = "wss"
  16. const TwitchApiHost = "pubsub-edge.twitch.tv"
  17. const TwitchApiPath = ""
  18. const TwitchApiMaxTopics = 50
  19. // PubSub is represent of API client.
  20. type PubSub struct {
  21. sync.RWMutex
  22. URL url.URL
  23. Connections map[int64]*Connection
  24. // Events
  25. eventOnConnect func(*Connection)
  26. eventOnDisconnect func(*Connection)
  27. eventOnError func(*Connection, error)
  28. eventOnInfo func(*Connection, string)
  29. eventOnMessage func(*Connection, *Answer)
  30. eventOnPing func(*Connection, time.Time)
  31. eventOnPong func(*Connection, time.Time, time.Time)
  32. }
  33. // New create and returns new API client.
  34. func New() *PubSub {
  35. return NewWithURL(url.URL{
  36. Scheme: TwitchApiScheme,
  37. Host: TwitchApiHost,
  38. Path: TwitchApiPath,
  39. })
  40. }
  41. // NewWithURL create and returns new API client with custom API server URL.
  42. // It can be useful for testing.
  43. func NewWithURL(url url.URL) *PubSub {
  44. p := PubSub{
  45. URL: url,
  46. Connections: map[int64]*Connection{},
  47. }
  48. return &p
  49. }
  50. // -----------------------------------------------------------------------------
  51. func (p *PubSub) newConnection() *Connection {
  52. c := NewConnection(p.URL)
  53. c.OnConnect(p.eventOnConnect)
  54. c.OnDisconnect(p.eventOnDisconnect)
  55. c.OnError(p.eventOnError)
  56. c.OnInfo(p.eventOnInfo)
  57. c.OnMessage(p.eventOnMessage)
  58. c.OnPing(p.eventOnPing)
  59. c.OnPong(p.eventOnPong)
  60. return c
  61. }
  62. // -----------------------------------------------------------------------------
  63. // Listen is adding topics for listening. It take care of API limits.
  64. // New TCP connection will be created for every 50 topics.
  65. //
  66. // https://dev.twitch.tv/docs/pubsub/#connection-management
  67. func (p *PubSub) Listen(topic string, params ...interface{}) {
  68. p.Lock()
  69. defer p.Unlock()
  70. // Create and add first connection
  71. if len(p.Connections) <= 0 {
  72. c := p.newConnection()
  73. p.Connections[c.ID] = c
  74. }
  75. t := p.Topic(topic, params...)
  76. // Check topic in connection
  77. // Don't continue if already present
  78. for _, c := range p.Connections {
  79. if c.HasTopic(t) {
  80. return
  81. }
  82. }
  83. // Add topic to first not busy connection
  84. for _, c := range p.Connections {
  85. if c.TopicsCount() < TwitchApiMaxTopics {
  86. c.AddTopic(t)
  87. return
  88. }
  89. }
  90. // Create new one and add
  91. c := p.newConnection()
  92. p.Connections[c.ID] = c
  93. c.AddTopic(t)
  94. }
  95. // Unlisten is remove topics from listening. It take care of API limits too.
  96. // Connection count will automatically decrease of needs.
  97. //
  98. // https://dev.twitch.tv/docs/pubsub/#connection-management
  99. func (p *PubSub) Unlisten(topic string, params ...interface{}) {
  100. p.Lock()
  101. defer p.Unlock()
  102. t := p.Topic(topic, params...)
  103. // Search and unlisten
  104. for _, c := range p.Connections {
  105. if c.HasTopic(t) {
  106. c.RemoveTopic(t)
  107. // Must not contain duplicates
  108. // So just remove first and break
  109. break
  110. }
  111. }
  112. // Remove empty connections
  113. for i, c := range p.Connections {
  114. if c.TopicsCount() <= 0 {
  115. _ = c.Close()
  116. delete(p.Connections, i)
  117. // Can't be more than one connection without topics
  118. // So just close, remove first and break
  119. break
  120. }
  121. }
  122. }
  123. // Topic generate correct topic for API.
  124. // Params can be as number or string.
  125. //
  126. // https://dev.twitch.tv/docs/pubsub/#topics
  127. func (p *PubSub) Topic(topic string, params ...interface{}) string {
  128. if len(params) <= 0 {
  129. return topic
  130. }
  131. var list []string
  132. for _, param := range params {
  133. list = append(list, fmt.Sprint(param))
  134. }
  135. return fmt.Sprintf("%s.%s", topic, strings.Join(list, "."))
  136. }
  137. // Close is close all connections.
  138. // Usually need to call at the end of app life.
  139. func (p *PubSub) Close() {
  140. p.Lock()
  141. defer p.Unlock()
  142. for i, c := range p.Connections {
  143. _ = c.Close()
  144. delete(p.Connections, i)
  145. }
  146. }
  147. // -----------------------------------------------------------------------------
  148. // OnConnect is bind func to event.
  149. // Will fire for every connection.
  150. func (c *PubSub) OnConnect(fn func(*Connection)) {
  151. c.eventOnConnect = fn
  152. }
  153. // OnDisconnect is bind func to event.
  154. // Will fire for every connection.
  155. func (c *PubSub) OnDisconnect(fn func(*Connection)) {
  156. c.eventOnDisconnect = fn
  157. }
  158. // OnError is bind func to event.
  159. // Will fire for every connection.
  160. func (c *PubSub) OnError(fn func(*Connection, error)) {
  161. c.eventOnError = fn
  162. }
  163. // OnInfo is bind func to event.
  164. // Will fire for every connection.
  165. func (c *PubSub) OnInfo(fn func(*Connection, string)) {
  166. c.eventOnInfo = fn
  167. }
  168. // OnMessage is bind func to event.
  169. // Will fire for every connection.
  170. func (c *PubSub) OnMessage(fn func(*Connection, *Answer)) {
  171. c.eventOnMessage = fn
  172. }
  173. // OnPing is bind func to event.
  174. // Will fire for every connection.
  175. func (c *PubSub) OnPing(fn func(*Connection, time.Time)) {
  176. c.eventOnPing = fn
  177. }
  178. // OnPong is bind func to event.
  179. // Will fire for every connection.
  180. func (c *PubSub) OnPong(fn func(*Connection, time.Time, time.Time)) {
  181. c.eventOnPong = fn
  182. }