connection.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package pubsub
  2. import (
  3. "net/url"
  4. "sync"
  5. "time"
  6. "github.com/gorilla/websocket"
  7. )
  8. // At least once every 5 minutes by docs but better keep this at 30 seconds
  9. //
  10. // https://dev.twitch.tv/docs/pubsub/#connection-management
  11. const TwitchApiPingEach = 15 * time.Second // 30 seconds
  12. const TwitchApiPingTimeout = 10 * time.Second // 10 seconds
  13. var nextConnectionID int64 = 0
  14. // Connection is represent of one connection.
  15. type Connection struct {
  16. sync.RWMutex
  17. done chan struct{}
  18. topics map[string]*struct{}
  19. active bool
  20. url url.URL
  21. ping_start time.Time
  22. ping_sended bool
  23. ID int64
  24. Connection *websocket.Conn
  25. // Events
  26. eventOnConnect func(*Connection)
  27. eventOnDisconnect func(*Connection)
  28. eventOnError func(*Connection, error)
  29. eventOnInfo func(*Connection, string)
  30. eventOnMessage func(*Connection, []byte)
  31. eventOnPing func(*Connection, time.Time)
  32. eventOnPong func(*Connection, time.Time, time.Time)
  33. }
  34. // NewConnection create new connection.
  35. // Returns pointer to connection.
  36. func NewConnection(url url.URL) *Connection {
  37. c := &Connection{
  38. done: make(chan struct{}),
  39. topics: map[string]*struct{}{},
  40. active: false,
  41. url: url,
  42. ping_start: time.Now(),
  43. ping_sended: false,
  44. ID: nextConnectionID,
  45. Connection: nil,
  46. }
  47. go_reconnector(c)
  48. go_reader(c)
  49. go_pinger(c)
  50. nextConnectionID++
  51. return c
  52. }
  53. // -----------------------------------------------------------------------------
  54. func (c *Connection) onConnect() {
  55. if c.eventOnConnect != nil {
  56. c.eventOnConnect(c)
  57. }
  58. }
  59. func (c *Connection) onDisconnect() {
  60. if c.eventOnDisconnect != nil {
  61. c.eventOnDisconnect(c)
  62. }
  63. }
  64. func (c *Connection) onError(err error) {
  65. if c.eventOnError != nil {
  66. c.eventOnError(c, err)
  67. }
  68. }
  69. func (c *Connection) onInfo(str string) {
  70. if c.eventOnInfo != nil {
  71. c.eventOnInfo(c, str)
  72. }
  73. }
  74. func (c *Connection) onMessage(msg []byte) {
  75. if c.eventOnMessage != nil {
  76. c.eventOnMessage(c, msg)
  77. }
  78. }
  79. func (c *Connection) onPing(start time.Time) {
  80. if c.eventOnPing != nil {
  81. c.eventOnPing(c, start)
  82. }
  83. }
  84. func (c *Connection) onPong(start, end time.Time) {
  85. if c.eventOnPong != nil {
  86. c.eventOnPong(c, start, end)
  87. }
  88. }
  89. // -----------------------------------------------------------------------------
  90. func (c *Connection) listenTopis() {
  91. topics := []string{}
  92. for topic := range c.topics {
  93. topics = append(topics, topic)
  94. }
  95. msg := Answer{Type: Listen, Data: AnswerDataTopics{Topics: topics}}.JSON()
  96. if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil {
  97. c.onError(err)
  98. c.active = false
  99. c.onDisconnect()
  100. }
  101. }
  102. // -----------------------------------------------------------------------------
  103. // AddTopic is adding topics for listening.
  104. func (c *Connection) AddTopic(topic string) {
  105. if _, ok := c.topics[topic]; ok {
  106. return
  107. }
  108. if len(c.topics) >= TwitchApiMaxTopics {
  109. return
  110. }
  111. c.Lock()
  112. defer c.Unlock()
  113. c.topics[topic] = nil
  114. // TODO: Send cmd...
  115. }
  116. // RemoveTopic is remove topics from listening.
  117. func (c *Connection) RemoveTopic(topic string) {
  118. if _, ok := c.topics[topic]; !ok {
  119. return
  120. }
  121. c.Lock()
  122. defer c.Unlock()
  123. delete(c.topics, topic)
  124. // TODO: Send cmd...
  125. }
  126. // Topics returns all current listen topics.
  127. func (c *Connection) Topics() []string {
  128. // TODO: ...
  129. return []string{}
  130. }
  131. // Close is close connection and shutdown all goroutines.
  132. // Usually it's need to call before destroying.
  133. func (c *Connection) Close() error {
  134. c.active = false
  135. close(c.done)
  136. // It can be not initialized
  137. if c.Connection != nil {
  138. return c.Connection.Close()
  139. }
  140. return nil
  141. }
  142. // -----------------------------------------------------------------------------
  143. func (c *Connection) OnConnect(fn func(*Connection)) {
  144. c.eventOnConnect = fn
  145. }
  146. func (c *Connection) OnDisconnect(fn func(*Connection)) {
  147. c.eventOnDisconnect = fn
  148. }
  149. func (c *Connection) OnError(fn func(*Connection, error)) {
  150. c.eventOnError = fn
  151. }
  152. func (c *Connection) OnInfo(fn func(*Connection, string)) {
  153. c.eventOnInfo = fn
  154. }
  155. func (c *Connection) OnMessage(fn func(*Connection, []byte)) {
  156. c.eventOnMessage = fn
  157. }
  158. func (c *Connection) OnPing(fn func(*Connection, time.Time)) {
  159. c.eventOnPing = fn
  160. }
  161. func (c *Connection) OnPong(fn func(*Connection, time.Time, time.Time)) {
  162. c.eventOnPong = fn
  163. }