Browse Source

Complete PubSub

Implement HasTopic func
PubSub topic params fixes
Complete PubSub.Unlisten func
Complete PubSub.Listen func
Implement PubSub.Close
Volodymyr Tkach 2 years ago
parent
commit
1ee234c69b
3 changed files with 99 additions and 3 deletions
  1. 8 0
      pubsub/connection.go
  2. 90 3
      pubsub/pubsub.go
  3. 1 0
      pubsub/pubsub_test.go

+ 8 - 0
pubsub/connection.go

@@ -207,6 +207,14 @@ func (c *Connection) Topics() []string {
 	return topics
 }
 
+// HasTopic returns true if topic present.
+func (c *Connection) HasTopic(topic string) bool {
+	if _, ok := c.topics[topic]; ok {
+		return true
+	}
+	return false
+}
+
 // TopicsCount return count of topics.
 func (c *Connection) TopicsCount() int {
 	return len(c.topics)

+ 90 - 3
pubsub/pubsub.go

@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"net/url"
 	"strings"
+	"sync"
 	"time"
 )
 
@@ -21,6 +22,8 @@ const TwitchApiMaxTopics = 50
 
 // PubSub is represent of API client.
 type PubSub struct {
+	sync.RWMutex
+
 	URL         url.URL
 	Connections map[int64]*Connection
 
@@ -46,18 +49,65 @@ func New() *PubSub {
 // NewWithURL create and returns new API client with custom API server URL.
 // It can be useful for testing.
 func NewWithURL(url url.URL) *PubSub {
-	p := PubSub{URL: url}
+	p := PubSub{
+		URL:         url,
+		Connections: map[int64]*Connection{},
+	}
 	return &p
 }
 
 // -----------------------------------------------------------------------------
 
+func (p *PubSub) newConnection() *Connection {
+	c := NewConnection(p.URL)
+	c.OnConnect(p.eventOnConnect)
+	c.OnDisconnect(p.eventOnDisconnect)
+	c.OnError(p.eventOnError)
+	c.OnInfo(p.eventOnInfo)
+	c.OnMessage(p.eventOnMessage)
+	c.OnPing(p.eventOnPing)
+	c.OnPong(p.eventOnPong)
+	return c
+}
+
+// -----------------------------------------------------------------------------
+
 // Listen is adding topics for listening. It take care of API limits.
 // New TCP connection will be created for every 50 topics.
 //
 // https://dev.twitch.tv/docs/pubsub/#connection-management
 func (p *PubSub) Listen(topic string, params ...interface{}) {
-	// TODO: ...
+	p.Lock()
+	defer p.Unlock()
+
+	// Create and add first connection
+	if len(p.Connections) <= 0 {
+		c := p.newConnection()
+		p.Connections[c.ID] = c
+	}
+
+	t := p.Topic(topic, params...)
+
+	// Check topic in connection
+	// Don't continue if already present
+	for _, c := range p.Connections {
+		if c.HasTopic(t) {
+			return
+		}
+	}
+
+	// Add topic to first not busy connection
+	for _, c := range p.Connections {
+		if c.TopicsCount() < TwitchApiMaxTopics {
+			c.AddTopic(t)
+			return
+		}
+	}
+
+	// Create new one and add
+	c := p.newConnection()
+	p.Connections[c.ID] = c
+	c.AddTopic(t)
 }
 
 // Unlisten is remove topics from listening. It take care of API limits too.
@@ -65,7 +115,33 @@ func (p *PubSub) Listen(topic string, params ...interface{}) {
 //
 // https://dev.twitch.tv/docs/pubsub/#connection-management
 func (p *PubSub) Unlisten(topic string, params ...interface{}) {
-	// TODO: ...
+	p.Lock()
+	defer p.Unlock()
+
+	t := p.Topic(topic, params...)
+
+	// Search and unlisten
+	for _, c := range p.Connections {
+		if c.HasTopic(t) {
+			c.RemoveTopic(t)
+
+			// Must not contain duplicates
+			// So just remove first and break
+			break
+		}
+	}
+
+	// Remove empty connections
+	for i, c := range p.Connections {
+		if c.TopicsCount() <= 0 {
+			_ = c.Close()
+			delete(p.Connections, i)
+
+			// Can't be more than one connection without topics
+			// So just close, remove first and break
+			break
+		}
+	}
 }
 
 // Topic generate correct topic for API.
@@ -85,6 +161,17 @@ func (p *PubSub) Topic(topic string, params ...interface{}) string {
 	return fmt.Sprintf("%s.%s", topic, strings.Join(list, "."))
 }
 
+// Close is close all connections.
+func (p *PubSub) Close() {
+	p.Lock()
+	defer p.Unlock()
+
+	for i, c := range p.Connections {
+		_ = c.Close()
+		delete(p.Connections, i)
+	}
+}
+
 // -----------------------------------------------------------------------------
 
 func (c *PubSub) OnConnect(fn func(*Connection)) {

+ 1 - 0
pubsub/pubsub_test.go

@@ -19,6 +19,7 @@ var _ = Describe("PubSub", func() {
 
 	Context("Topic", func() {
 		It("generate correct topic", func() {
+			Expect(ps.Topic("channel-bits-events-v1.123")).To(Equal("channel-bits-events-v1.123"))
 			Expect(ps.Topic("channel-bits-events-v1", 123)).To(Equal("channel-bits-events-v1.123"))
 			Expect(ps.Topic("channel-bits-events-v1", "123")).To(Equal("channel-bits-events-v1.123"))
 			Expect(ps.Topic("channel-bits-events-v1", 123, 456)).To(Equal("channel-bits-events-v1.123.456"))