package pubsub import ( "net/url" "sync" "time" "github.com/gorilla/websocket" ) // At least once every 5 minutes by docs but better keep this at 30 seconds // // https://dev.twitch.tv/docs/pubsub/#connection-management const TwitchApiPingEach = 15 * time.Second // 30 seconds const TwitchApiPingTimeout = 10 * time.Second // 10 seconds var nextConnectionID int64 = 0 // Connection is represent of one connection. type Connection struct { sync.RWMutex done chan struct{} topics map[string]*struct{} active bool url url.URL ping_start time.Time ping_sended bool ID int64 Connection *websocket.Conn // Events eventOnConnect func(*Connection) eventOnDisconnect func(*Connection) eventOnError func(*Connection, error) eventOnInfo func(*Connection, string) eventOnMessage func(*Connection, *Answer) eventOnPing func(*Connection, time.Time) eventOnPong func(*Connection, time.Time, time.Time) } // NewConnection create new connection. // Returns pointer to connection. func NewConnection(url url.URL) *Connection { c := &Connection{ done: make(chan struct{}), topics: map[string]*struct{}{}, active: false, url: url, ping_start: time.Now(), ping_sended: false, ID: nextConnectionID, Connection: nil, } go_reconnector(c) go_reader(c) go_pinger(c) nextConnectionID++ return c } // ----------------------------------------------------------------------------- func (c *Connection) onConnect() { if c.eventOnConnect != nil { c.eventOnConnect(c) } } func (c *Connection) onDisconnect() { if c.eventOnDisconnect != nil { c.eventOnDisconnect(c) } } func (c *Connection) onError(err error) { if c.eventOnError != nil { c.eventOnError(c, err) } } func (c *Connection) onInfo(str string) { if c.eventOnInfo != nil { c.eventOnInfo(c, str) } } func (c *Connection) onMessage(msg *Answer) { if c.eventOnMessage != nil { c.eventOnMessage(c, msg) } } func (c *Connection) onPing(start time.Time) { if c.eventOnPing != nil { c.eventOnPing(c, start) } } func (c *Connection) onPong(start, end time.Time) { if c.eventOnPong != nil { c.eventOnPong(c, start, end) } } // ----------------------------------------------------------------------------- // listenTopis is generate topics and send request to API. // Also it can close connection because it's API limits. // Each connection must listen at least one topic. func (c *Connection) listenTopis() { topics := []string{} for topic := range c.topics { topics = append(topics, topic) } // No topics, close connection if len(topics) <= 0 { c.active = false if c.Connection != nil { c.Connection.Close() } return } // Send LISTEN request if c.Connection != nil && c.active { // TODO: track bad topics and auto remove? [FUTURE] // One bad topic will break all next topics // The error message associated with the request, or an empty string if there is no error. // For Bits and whispers events requests, error responses can be: // ERR_BADMESSAGE, ERR_BADAUTH, ERR_SERVER, ERR_BADTOPIC msg := Answer{Type: Listen, Data: AnswerDataTopics{Topics: topics}}.JSON() if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil { c.onError(err) c.active = false c.onDisconnect() } } } // ----------------------------------------------------------------------------- // AddTopic is adding topics for listening. func (c *Connection) AddTopic(topic string) { if _, ok := c.topics[topic]; ok { return } if len(c.topics) >= TwitchApiMaxTopics { return } c.Lock() defer c.Unlock() c.topics[topic] = nil c.listenTopis() } // RemoveTopic is remove topic from listening. func (c *Connection) RemoveTopic(topic string) { if _, ok := c.topics[topic]; !ok { return } c.Lock() defer c.Unlock() delete(c.topics, topic) // Send UNLISTEN request if c.Connection != nil && c.active { msg := Answer{Type: Unlisten, Data: AnswerDataTopics{Topics: []string{topic}}}.JSON() if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil { c.onError(err) c.active = false c.onDisconnect() } } // No topics, close connection if len(c.topics) <= 0 { c.active = false if c.Connection != nil { c.Connection.Close() } } } // RemoveAllTopics is remove all topics from listening. func (c *Connection) RemoveAllTopics() { c.Lock() defer c.Unlock() c.topics = map[string]*struct{}{} c.listenTopis() } // Topics returns all current listen topics. func (c *Connection) Topics() []string { c.Lock() defer c.Unlock() topics := []string{} for topic := range c.topics { topics = append(topics, topic) } return topics } // HasTopic returns true if topic present. func (c *Connection) HasTopic(topic string) bool { if _, ok := c.topics[topic]; ok { return true } return false } // TopicsCount return count of topics. func (c *Connection) TopicsCount() int { return len(c.topics) } // Close is close connection and shutdown all goroutines. // Usually it's need to call before destroying. func (c *Connection) Close() error { c.active = false close(c.done) // It can be not initialized if c.Connection != nil { return c.Connection.Close() } return nil } // ----------------------------------------------------------------------------- func (c *Connection) OnConnect(fn func(*Connection)) { c.eventOnConnect = fn } func (c *Connection) OnDisconnect(fn func(*Connection)) { c.eventOnDisconnect = fn } func (c *Connection) OnError(fn func(*Connection, error)) { c.eventOnError = fn } func (c *Connection) OnInfo(fn func(*Connection, string)) { c.eventOnInfo = fn } func (c *Connection) OnMessage(fn func(*Connection, *Answer)) { c.eventOnMessage = fn } func (c *Connection) OnPing(fn func(*Connection, time.Time)) { c.eventOnPing = fn } func (c *Connection) OnPong(fn func(*Connection, time.Time, time.Time)) { c.eventOnPong = fn }