Browse Source

Implement RemoveAllTopics func

Volodymyr Tkach 2 years ago
parent
commit
8ee14dd9d2
1 changed files with 23 additions and 1 deletions
  1. 23 1
      pubsub/connection.go

+ 23 - 1
pubsub/connection.go

@@ -112,11 +112,24 @@ func (c *Connection) onPong(start, end time.Time) {
 
 // -----------------------------------------------------------------------------
 
+// listenTopis is generate topics and send request to API.
+// Also it can close connection because it's API limits.
+// Each connection must listen at least one topic.
 func (c *Connection) listenTopis() {
 	topics := []string{}
 	for topic := range c.topics {
 		topics = append(topics, topic)
 	}
+
+	if len(topics) <= 0 {
+		c.active = false
+		if c.Connection != nil {
+			c.Connection.Close()
+		}
+		return
+	}
+
+	// Send LISTEN request
 	msg := Answer{Type: Listen, Data: AnswerDataTopics{Topics: topics}}.JSON()
 	if err := c.Connection.WriteMessage(websocket.TextMessage, msg); err != nil {
 		c.onError(err)
@@ -144,7 +157,7 @@ func (c *Connection) AddTopic(topic string) {
 	c.listenTopis()
 }
 
-// RemoveTopic is remove topics from listening.
+// RemoveTopic is remove topic from listening.
 func (c *Connection) RemoveTopic(topic string) {
 	if _, ok := c.topics[topic]; !ok {
 		return
@@ -157,6 +170,15 @@ func (c *Connection) RemoveTopic(topic string) {
 	c.listenTopis()
 }
 
+// RemoveAllTopics is remove all topics from listening.
+func (c *Connection) RemoveAllTopics() {
+	c.Lock()
+	defer c.Unlock()
+	c.topics = map[string]*struct{}{}
+
+	c.listenTopis()
+}
+
 // Topics returns all current listen topics.
 func (c *Connection) Topics() []string {
 	topics := []string{}