Browse Source

Initial commit

Volodymyr Tkach 2 years ago
parent
commit
ebde0f0bbe
10 changed files with 614 additions and 0 deletions
  1. 1 0
      .gitignore
  2. 15 0
      Makefile
  3. 20 0
      go.mod
  4. 92 0
      go.sum
  5. 209 0
      pubsub/connection.go
  6. 60 0
      pubsub/go_pinger.go
  7. 58 0
      pubsub/go_reader.go
  8. 49 0
      pubsub/go_reconnector.go
  9. 74 0
      pubsub/pubsub.go
  10. 36 0
      pubsub/pubsub_test.go

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+tmp.go

+ 15 - 0
Makefile

@@ -0,0 +1,15 @@
+default: test
+
+clean:
+	go clean -testcache ./...
+
+test:
+	go test ./...
+
+lint:
+	golangci-lint run --disable=structcheck
+
+tidy:
+	go mod tidy
+
+.PHONY: default clean test lint tidy

+ 20 - 0
go.mod

@@ -0,0 +1,20 @@
+module github.com/vladimirok5959/golang-twitch
+
+go 1.19
+
+require (
+	github.com/gorilla/websocket v1.5.0
+	github.com/onsi/ginkgo v1.16.5
+	github.com/onsi/gomega v1.26.0
+)
+
+require (
+	github.com/fsnotify/fsnotify v1.4.9 // indirect
+	github.com/google/go-cmp v0.5.9 // indirect
+	github.com/nxadm/tail v1.4.8 // indirect
+	golang.org/x/net v0.5.0 // indirect
+	golang.org/x/sys v0.4.0 // indirect
+	golang.org/x/text v0.6.0 // indirect
+	gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
+	gopkg.in/yaml.v3 v3.0.1 // indirect
+)

+ 92 - 0
go.sum

@@ -0,0 +1,92 @@
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
+github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
+github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
+github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
+github.com/onsi/ginkgo/v2 v2.7.0 h1:/XxtEV3I3Eif/HobnVx9YmJgk8ENdRsuUmM+fLCFNow=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.26.0 h1:03cDLK28U6hWvCAns6NeydX3zIm4SF3ci69ulidS32Q=
+github.com/onsi/gomega v1.26.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
+golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
+golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
+golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 209 - 0
pubsub/connection.go

@@ -0,0 +1,209 @@
+package pubsub
+
+import (
+	"net/url"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/gorilla/websocket"
+)
+
+// At least once every 5 minutes by docs but better keep this at 30 seconds
+//
+// https://dev.twitch.tv/docs/pubsub/#connection-management
+const TwitchApiPingEach = 15 * time.Second    // 30 seconds
+const TwitchApiPingTimeout = 10 * time.Second // 10 seconds
+
+var nextConnectionID int64 = 0
+
+// Connection is represent of one connection.
+type Connection struct {
+	sync.RWMutex
+
+	done   chan struct{}
+	topics map[string]*struct{}
+	active bool
+	url    url.URL
+
+	ping_start  time.Time
+	ping_sended bool
+
+	ID         int64
+	Connection *websocket.Conn
+
+	// Events
+	eventOnConnect    func(*Connection)
+	eventOnDisconnect func(*Connection)
+	eventOnError      func(*Connection, error)
+	eventOnInfo       func(*Connection, string)
+	eventOnMessage    func(*Connection, []byte)
+	eventOnPing       func(*Connection, time.Time)
+	eventOnPong       func(*Connection, time.Time, time.Time)
+}
+
+// NewConnection create new connection.
+// Returns pointer to connection.
+func NewConnection(url url.URL) *Connection {
+	c := &Connection{
+		done:   make(chan struct{}),
+		topics: map[string]*struct{}{},
+		active: false,
+		url:    url,
+
+		ping_start:  time.Now(),
+		ping_sended: false,
+
+		ID:         nextConnectionID,
+		Connection: nil,
+	}
+
+	go_reconnector(c)
+	go_reader(c)
+	go_pinger(c)
+
+	nextConnectionID++
+
+	return c
+}
+
+// -----------------------------------------------------------------------------
+
+func (c *Connection) onConnect() {
+	if c.eventOnConnect != nil {
+		c.eventOnConnect(c)
+	}
+}
+
+func (c *Connection) onDisconnect() {
+	if c.eventOnDisconnect != nil {
+		c.eventOnDisconnect(c)
+	}
+}
+
+func (c *Connection) onError(err error) {
+	if c.eventOnError != nil {
+		c.eventOnError(c, err)
+	}
+}
+
+func (c *Connection) onInfo(str string) {
+	if c.eventOnInfo != nil {
+		c.eventOnInfo(c, str)
+	}
+}
+
+func (c *Connection) onMessage(msg []byte) {
+	if c.eventOnMessage != nil {
+		c.eventOnMessage(c, msg)
+	}
+}
+
+func (c *Connection) onPing(start time.Time) {
+	if c.eventOnPing != nil {
+		c.eventOnPing(c, start)
+	}
+}
+
+func (c *Connection) onPong(start, end time.Time) {
+	if c.eventOnPong != nil {
+		c.eventOnPong(c, start, end)
+	}
+}
+
+// -----------------------------------------------------------------------------
+
+func (c *Connection) listenTopis() {
+	topics := []string{}
+	for topic := range c.topics {
+		topics = append(topics, topic)
+	}
+	msg := `{"type":"LISTEN","nonce":"","data":{"topics":["` + strings.Join(topics, `","`) + `"]}}`
+	if err := c.Connection.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil {
+		c.onError(err)
+		c.active = false
+		c.onDisconnect()
+	}
+}
+
+// -----------------------------------------------------------------------------
+
+// AddTopic is adding topics for listening.
+func (c *Connection) AddTopic(topic string) {
+	if _, ok := c.topics[topic]; ok {
+		return
+	}
+
+	if len(c.topics) >= TwitchApiMaxTopics {
+		return
+	}
+
+	c.Lock()
+	defer c.Unlock()
+	c.topics[topic] = nil
+
+	// TODO: Send cmd...
+}
+
+// RemoveTopic is remove topics from listening.
+func (c *Connection) RemoveTopic(topic string) {
+	if _, ok := c.topics[topic]; !ok {
+		return
+	}
+
+	c.Lock()
+	defer c.Unlock()
+	delete(c.topics, topic)
+
+	// TODO: Send cmd...
+}
+
+// Topics returns all current listen topics.
+func (c *Connection) Topics() []string {
+	// TODO: ...
+	return []string{}
+}
+
+// Close is close connection and shutdown all goroutines.
+// Usually it's need to call before destroying.
+func (c *Connection) Close() error {
+	c.active = false
+	close(c.done)
+
+	// It can be not initialized
+	if c.Connection != nil {
+		return c.Connection.Close()
+	}
+
+	return nil
+}
+
+// -----------------------------------------------------------------------------
+
+func (c *Connection) OnConnect(fn func(*Connection)) {
+	c.eventOnConnect = fn
+}
+
+func (c *Connection) OnDisconnect(fn func(*Connection)) {
+	c.eventOnDisconnect = fn
+}
+
+func (c *Connection) OnError(fn func(*Connection, error)) {
+	c.eventOnError = fn
+}
+
+func (c *Connection) OnInfo(fn func(*Connection, string)) {
+	c.eventOnInfo = fn
+}
+
+func (c *Connection) OnMessage(fn func(*Connection, []byte)) {
+	c.eventOnMessage = fn
+}
+
+func (c *Connection) OnPing(fn func(*Connection, time.Time)) {
+	c.eventOnPing = fn
+}
+
+func (c *Connection) OnPong(fn func(*Connection, time.Time, time.Time)) {
+	c.eventOnPong = fn
+}

+ 60 - 0
pubsub/go_pinger.go

@@ -0,0 +1,60 @@
+package pubsub
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/gorilla/websocket"
+)
+
+func go_pinger(c *Connection) {
+	// Pinger (sender)
+	go func(c *Connection) {
+		for {
+			select {
+			case <-time.After(1 * time.Second):
+				if c.active && !c.ping_sended {
+					if time.Since(c.ping_start) > TwitchApiPingEach {
+						if err := c.Connection.WriteMessage(
+							websocket.TextMessage,
+							[]byte(`{"type":"PING"}`),
+						); err != nil {
+							c.onError(err)
+							c.active = false
+							c.onDisconnect()
+						} else {
+							c.ping_start = time.Now()
+							c.ping_sended = true
+							c.onPing(c.ping_start)
+						}
+					}
+				}
+			case <-c.done:
+				return
+			}
+		}
+	}(c)
+
+	// Pinger (handler)
+	go func(c *Connection) {
+		for {
+			select {
+			case <-time.After(1 * time.Second):
+				if c.active && c.ping_sended {
+					if time.Since(c.ping_start) > TwitchApiPingTimeout {
+						c.onInfo(fmt.Sprintf("warning, no PONG response more than %d seconds", TwitchApiPingTimeout))
+						c.active = false
+						c.onDisconnect()
+						c.ping_start = time.Now()
+						c.ping_sended = false
+						if err := c.Connection.Close(); err != nil {
+							c.onError(err)
+						}
+					}
+				}
+			case <-c.done:
+				return
+			}
+		}
+	}(c)
+}

+ 58 - 0
pubsub/go_reader.go

@@ -0,0 +1,58 @@
+package pubsub
+
+import (
+	"encoding/json"
+	"time"
+)
+
+func go_reader(c *Connection) {
+	go func(c *Connection) {
+		for {
+			select {
+			case <-c.done:
+				return
+			default:
+				if c.active {
+					_, msg, err := c.Connection.ReadMessage()
+					if err != nil {
+						c.onError(err)
+						c.active = false
+						c.onDisconnect()
+
+						// Wait 1 second or return immediately
+						select {
+						case <-time.After(time.Second):
+						case <-c.done:
+							return
+						}
+					} else {
+						var resp struct {
+							Type string `json:"type"`
+						}
+						if err := json.Unmarshal(msg, &resp); err != nil {
+							c.onError(err)
+						} else {
+							if resp.Type == "PONG" {
+								ct := time.Now()
+								c.onPong(c.ping_start, ct)
+								c.ping_start = ct
+								c.ping_sended = false
+							} else if resp.Type == "RECONNECT" {
+								// TODO: ...
+							} else {
+								c.onMessage(msg)
+							}
+						}
+					}
+				} else {
+					// Wait 1 second or return immediately
+					select {
+					case <-time.After(time.Second):
+					case <-c.done:
+						return
+					}
+				}
+			}
+		}
+	}(c)
+}

+ 49 - 0
pubsub/go_reconnector.go

@@ -0,0 +1,49 @@
+package pubsub
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/gorilla/websocket"
+)
+
+func go_reconnector(c *Connection) {
+	go func(c *Connection) {
+		for {
+			select {
+			case <-c.done:
+				return
+			default:
+				if !c.active && len(c.topics) > 0 {
+					c.onInfo(fmt.Sprintf("reconnecting to: %s", c.url.String()))
+					conn, _, err := websocket.DefaultDialer.Dial(c.url.String(), nil)
+					if err != nil {
+						c.onError(err)
+
+						// Wait 1 second or return immediately
+						select {
+						case <-time.After(time.Second):
+						case <-c.done:
+							return
+						}
+					} else {
+						c.onInfo("reconnected successfully")
+						c.Connection = conn
+						c.active = true
+						c.onConnect()
+
+						// Listen all topics
+						c.listenTopis()
+					}
+				} else {
+					// Wait 1 second or return immediately
+					select {
+					case <-time.After(time.Second):
+					case <-c.done:
+						return
+					}
+				}
+			}
+		}
+	}(c)
+}

+ 74 - 0
pubsub/pubsub.go

@@ -0,0 +1,74 @@
+// Package implements Twitch API PubSub and automatically take care of API
+// limit. Also it will handle automatically reconnections, ping/pong and
+// maintenance requests.
+package pubsub
+
+import (
+	"fmt"
+	"net/url"
+	"strings"
+)
+
+// Default Twitch server API credentials.
+//
+// https://dev.twitch.tv/docs/pubsub/#connection-management
+const TwitchApiScheme = "wss"
+const TwitchApiHost = "pubsub-edge.twitch.tv"
+const TwitchApiPath = ""
+
+const TwitchApiMaxTopics = 50
+
+// PubSub is represent of API client.
+type PubSub struct {
+	URL         url.URL
+	Connections map[int64]*Connection
+}
+
+// New create and returns new API client.
+func New() *PubSub {
+	return NewWithURL(url.URL{
+		Scheme: TwitchApiScheme,
+		Host:   TwitchApiHost,
+		Path:   TwitchApiPath,
+	})
+}
+
+// NewWithURL create and returns new API client with custom API server URL.
+// It can be useful for testing.
+func NewWithURL(url url.URL) *PubSub {
+	p := PubSub{URL: url}
+	return &p
+}
+
+// Listen is adding topics for listening. It take care of API limits.
+// New TCP connection will be created for every 50 topics.
+//
+// https://dev.twitch.tv/docs/pubsub/#connection-management
+func (p *PubSub) Listen(topic string, params ...interface{}) {
+	// TODO: ...
+}
+
+// Unlisten is remove topics from listening. It take care of API limits too.
+// Connection count will automatically decrease of needs.
+//
+// https://dev.twitch.tv/docs/pubsub/#connection-management
+func (p *PubSub) Unlisten(topic string, params ...interface{}) {
+	// TODO: ...
+}
+
+// Topic generate correct topic for API.
+// Params can be as number or string.
+//
+// https://dev.twitch.tv/docs/pubsub/#topics
+func (p *PubSub) Topic(topic string, params ...interface{}) string {
+	if len(params) <= 0 {
+		return topic
+	}
+
+	var list []string
+	for _, param := range params {
+		list = append(list, fmt.Sprint(param))
+	}
+
+	return fmt.Sprintf("%s.%s", topic, strings.Join(list, "."))
+}

+ 36 - 0
pubsub/pubsub_test.go

@@ -0,0 +1,36 @@
+package pubsub_test
+
+import (
+	"net/url"
+	"testing"
+
+	"github.com/vladimirok5959/golang-twitch/pubsub"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+var _ = Describe("PubSub", func() {
+	var ps *pubsub.PubSub
+
+	BeforeEach(func() {
+		ps = pubsub.NewWithURL(url.URL{Scheme: "ws", Host: "example.com", Path: ""})
+	})
+
+	Context("Topic", func() {
+		It("generate correct topic", func() {
+			Expect(ps.Topic("channel-bits-events-v1", 123)).To(Equal("channel-bits-events-v1.123"))
+			Expect(ps.Topic("channel-bits-events-v1", "123")).To(Equal("channel-bits-events-v1.123"))
+			Expect(ps.Topic("channel-bits-events-v1", 123, 456)).To(Equal("channel-bits-events-v1.123.456"))
+			Expect(ps.Topic("channel-bits-events-v1", 123, "456")).To(Equal("channel-bits-events-v1.123.456"))
+			Expect(ps.Topic("channel-bits-events-v1", "123", 456)).To(Equal("channel-bits-events-v1.123.456"))
+			Expect(ps.Topic("channel-bits-events-v1", "123", "456")).To(Equal("channel-bits-events-v1.123.456"))
+			Expect(ps.Topic("channel-bits-events-v1", 123, 456, 789)).To(Equal("channel-bits-events-v1.123.456.789"))
+		})
+	})
+})
+
+func TestSuite(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "PubSub")
+}