123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- // Package implements Twitch API PubSub and automatically take care of API
- // limits. Also it will handle automatically reconnections, ping/pong and
- // maintenance requests.
- package pubsub
- import (
- "context"
- "fmt"
- "net/url"
- "strings"
- "sync"
- "time"
- )
- // Default Twitch server API credentials.
- //
- // https://dev.twitch.tv/docs/pubsub/#connection-management
- const TwitchApiScheme = "wss"
- const TwitchApiHost = "pubsub-edge.twitch.tv"
- const TwitchApiPath = ""
- const TwitchApiMaxTopics = 50
- // PubSub is represent of API client.
- type PubSub struct {
- sync.RWMutex
- URL url.URL
- Connections map[int64]*Connection
- // Events
- eventOnConnect func(*Connection)
- eventOnDisconnect func(*Connection)
- eventOnError func(*Connection, error)
- eventOnInfo func(*Connection, string)
- eventOnMessage func(*Connection, *Answer)
- eventOnPing func(*Connection, time.Time)
- eventOnPong func(*Connection, time.Time, time.Time)
- }
- // New create and returns new API client.
- func New() *PubSub {
- return NewWithURL(url.URL{
- Scheme: TwitchApiScheme,
- Host: TwitchApiHost,
- Path: TwitchApiPath,
- })
- }
- // NewWithURL create and returns new API client with custom API server URL.
- // It can be useful for testing.
- func NewWithURL(url url.URL) *PubSub {
- p := PubSub{
- URL: url,
- Connections: map[int64]*Connection{},
- }
- return &p
- }
- // -----------------------------------------------------------------------------
- func (p *PubSub) newConnection() *Connection {
- c := NewConnection(p.URL)
- c.OnConnect(p.eventOnConnect)
- c.OnDisconnect(p.eventOnDisconnect)
- c.OnError(p.eventOnError)
- c.OnInfo(p.eventOnInfo)
- c.OnMessage(p.eventOnMessage)
- c.OnPing(p.eventOnPing)
- c.OnPong(p.eventOnPong)
- return c
- }
- // -----------------------------------------------------------------------------
- // Listen is adding topics for listening. It take care of API limits.
- // New TCP connection will be created for every 50 topics.
- //
- // https://dev.twitch.tv/docs/pubsub/#connection-management
- func (p *PubSub) Listen(ctx context.Context, topic string, params ...interface{}) {
- p.Lock()
- defer p.Unlock()
- // Create and add first connection
- if len(p.Connections) <= 0 {
- c := p.newConnection()
- p.Connections[c.ID] = c
- }
- t := p.Topic(topic, params...)
- // Check topic in connection
- // Don't continue if already present
- for _, c := range p.Connections {
- select {
- case <-ctx.Done():
- return
- default:
- }
- if c.HasTopic(t) {
- return
- }
- }
- // Add topic to first not busy connection
- for _, c := range p.Connections {
- select {
- case <-ctx.Done():
- return
- default:
- }
- if c.TopicsCount() < TwitchApiMaxTopics {
- c.AddTopic(t)
- return
- }
- }
- // Create new one and add
- c := p.newConnection()
- p.Connections[c.ID] = c
- c.AddTopic(t)
- }
- // Unlisten is remove topics from listening. It take care of API limits too.
- // Connection count will automatically decrease of needs.
- //
- // https://dev.twitch.tv/docs/pubsub/#connection-management
- func (p *PubSub) Unlisten(ctx context.Context, topic string, params ...interface{}) {
- p.Lock()
- defer p.Unlock()
- t := p.Topic(topic, params...)
- // Search and unlisten
- for _, c := range p.Connections {
- select {
- case <-ctx.Done():
- return
- default:
- }
- if c.HasTopic(t) {
- c.RemoveTopic(t)
- // Must not contain duplicates
- // So just remove first and break
- break
- }
- }
- // Remove empty connections
- for i, c := range p.Connections {
- select {
- case <-ctx.Done():
- return
- default:
- }
- if c.TopicsCount() <= 0 {
- _ = c.Close()
- delete(p.Connections, i)
- // Can't be more than one connection without topics
- // So just close, remove first and break
- break
- }
- }
- }
- // Topics returns all current listen topics.
- func (p *PubSub) Topics() []string {
- p.Lock()
- defer p.Unlock()
- topics := []string{}
- for _, c := range p.Connections {
- for topic := range c.topics {
- topics = append(topics, topic)
- }
- }
- return topics
- }
- // HasTopic returns true if topic present.
- func (p *PubSub) HasTopic(topic string, params ...interface{}) bool {
- p.Lock()
- defer p.Unlock()
- t := p.Topic(topic, params...)
- for _, c := range p.Connections {
- if c.HasTopic(t) {
- return true
- }
- }
- return false
- }
- // TopicsCount return count of topics.
- func (p *PubSub) TopicsCount() int {
- p.Lock()
- defer p.Unlock()
- count := 0
- for _, c := range p.Connections {
- count += c.TopicsCount()
- }
- return count
- }
- // Topic generate correct topic for API.
- // Params can be as number or string.
- //
- // https://dev.twitch.tv/docs/pubsub/#topics
- func (p *PubSub) Topic(topic string, params ...interface{}) string {
- if len(params) <= 0 {
- return topic
- }
- var list []string
- for _, param := range params {
- list = append(list, fmt.Sprint(param))
- }
- return fmt.Sprintf("%s.%s", topic, strings.Join(list, "."))
- }
- // Close is close all connections.
- // Usually need to call at the end of app life.
- func (p *PubSub) Close() {
- p.Lock()
- defer p.Unlock()
- for i, c := range p.Connections {
- _ = c.Close()
- delete(p.Connections, i)
- }
- }
- // -----------------------------------------------------------------------------
- // OnConnect is bind func to event.
- // Will fire for every connection.
- func (c *PubSub) OnConnect(fn func(*Connection)) {
- c.eventOnConnect = fn
- }
- // OnDisconnect is bind func to event.
- // Will fire for every connection.
- func (c *PubSub) OnDisconnect(fn func(*Connection)) {
- c.eventOnDisconnect = fn
- }
- // OnError is bind func to event.
- // Will fire for every connection.
- func (c *PubSub) OnError(fn func(*Connection, error)) {
- c.eventOnError = fn
- }
- // OnInfo is bind func to event.
- // Will fire for every connection.
- func (c *PubSub) OnInfo(fn func(*Connection, string)) {
- c.eventOnInfo = fn
- }
- // OnMessage is bind func to event.
- // Will fire for every connection.
- func (c *PubSub) OnMessage(fn func(*Connection, *Answer)) {
- c.eventOnMessage = fn
- }
- // OnPing is bind func to event.
- // Will fire for every connection.
- func (c *PubSub) OnPing(fn func(*Connection, time.Time)) {
- c.eventOnPing = fn
- }
- // OnPong is bind func to event.
- // Will fire for every connection.
- func (c *PubSub) OnPong(fn func(*Connection, time.Time, time.Time)) {
- c.eventOnPong = fn
- }
|