Browse Source

SMTP sender by worker, db.Ping with context, fix code

Vova Tkach 5 years ago
parent
commit
3c03a4f210

+ 1 - 1
engine/engine.go

@@ -69,7 +69,7 @@ func (this *Engine) Process() bool {
 	}
 
 	// Backend must use MySQL anyway, so, check and connect
-	err := this.Wrap.UseDatabase()
+	err := this.Wrap.UseDatabase(this.Wrap.R.Context())
 	if err != nil {
 		utils.SystemErrorPageEngine(this.Wrap.W, err)
 		return true

+ 4 - 3
engine/sqlw/sqlw.go

@@ -4,6 +4,7 @@ package sqlw
 // https://golang.org/pkg/database/sql/
 
 import (
+	"context"
 	"database/sql"
 	_ "github.com/go-sql-driver/mysql"
 
@@ -44,13 +45,13 @@ func (this *DB) Close() error {
 	return this.db.Close()
 }
 
-func (this *DB) Ping() error {
+func (this *DB) Ping(ctx context.Context) error {
 	if consts.ParamDebug {
-		err := this.db.Ping()
+		err := this.db.PingContext(ctx)
 		log("[CM] PING", time.Now(), err, true)
 		return err
 	}
-	return this.db.Ping()
+	return this.db.PingContext(ctx)
 }
 
 func (this *DB) SetConnMaxLifetime(d time.Duration) {

+ 4 - 3
engine/wrapper/wrapper.go

@@ -2,6 +2,7 @@ package wrapper
 
 import (
 	"bytes"
+	"context"
 	"errors"
 	"fmt"
 	"html/template"
@@ -120,7 +121,7 @@ func (this *Wrapper) dbReconnect() error {
 	return nil
 }
 
-func (this *Wrapper) UseDatabase() error {
+func (this *Wrapper) UseDatabase(ctx context.Context) error {
 	this.DB = this.MSPool.Get(this.CurrHost)
 	if this.DB == nil {
 		if err := this.dbReconnect(); err != nil {
@@ -128,12 +129,12 @@ func (this *Wrapper) UseDatabase() error {
 		}
 	}
 
-	if err := this.DB.Ping(); err != nil {
+	if err := this.DB.Ping(ctx); err != nil {
 		this.DB.Close()
 		if err := this.dbReconnect(); err != nil {
 			return err
 		}
-		if err := this.DB.Ping(); err != nil {
+		if err := this.DB.Ping(ctx); err != nil {
 			this.DB.Close()
 			return err
 		}

+ 16 - 8
main.go

@@ -51,7 +51,9 @@ func main() {
 	}
 
 	// Run database migration
-	if err := support.New().Migration(consts.ParamWwwDir); err != nil {
+	// TODO: something need here for migration
+	ctx := context.Background()
+	if err := support.New().Migration(ctx, consts.ParamWwwDir); err != nil {
 		fmt.Printf("[ERROR] MIGRATION FAILED: %s\n", err)
 	}
 
@@ -73,6 +75,9 @@ func main() {
 	// Xml generation
 	wXmlGen := xml_generator(consts.ParamWwwDir, mpool)
 
+	// SMTP sender
+	wSmtpSnd := smtp_sender(consts.ParamWwwDir, mpool)
+
 	// Init mounted resources
 	res := resource.New()
 	assets.PopulateResources(res)
@@ -83,10 +88,6 @@ func main() {
 	// Init modules
 	mods := modules.New()
 
-	// SMTP sender
-	smtp_cl_ch, smtp_cl_stop := smtp_start(consts.ParamWwwDir, mpool)
-	defer smtp_stop(smtp_cl_ch, smtp_cl_stop)
-
 	// Shop basket
 	sb := basket.New()
 	sb_cl_ch, sb_cl_stop := basket_clean_start(sb)
@@ -135,17 +136,17 @@ func main() {
 		}
 
 		var res *resource.Resource
-		if v, ok := (*o)[2].(*resource.Resource); ok {
+		if v, ok := (*o)[6].(*resource.Resource); ok {
 			res = v
 		}
 
 		var stat *static.Static
-		if v, ok := (*o)[3].(*static.Static); ok {
+		if v, ok := (*o)[7].(*static.Static); ok {
 			stat = v
 		}
 
 		var mods *modules.Modules
-		if v, ok := (*o)[4].(*modules.Modules); ok {
+		if v, ok := (*o)[8].(*modules.Modules); ok {
 			mods = v
 		}
 		// ---
@@ -282,6 +283,12 @@ func main() {
 		var errs []string
 
 		// ---
+		if wSmtpSnd, ok := (*o)[5].(*worker.Worker); ok {
+			if err := wSmtpSnd.Shutdown(ctx); err != nil {
+				errs = append(errs, fmt.Sprintf("(%T): %s", wSmtpSnd, err.Error()))
+			}
+		}
+
 		if wXmlGen, ok := (*o)[4].(*worker.Worker); ok {
 			if err := wXmlGen.Shutdown(ctx); err != nil {
 				errs = append(errs, fmt.Sprintf("(%T): %s", wXmlGen, err.Error()))
@@ -335,6 +342,7 @@ func main() {
 				wSessCl,
 				wImageGen,
 				wXmlGen,
+				wSmtpSnd,
 				res,
 				stat,
 				mods,

+ 1 - 1
modules/module_index_act_cypress.go

@@ -24,7 +24,7 @@ func (this *Modules) RegisterAction_IndexCypressReset() *Action {
 			return
 		}
 		defer db.Close()
-		err = db.Ping()
+		err = db.Ping(wrap.R.Context())
 		if err != nil {
 			wrap.Write(err.Error())
 			return

+ 1 - 1
modules/module_index_act_mysql_setup.go

@@ -58,7 +58,7 @@ func (this *Modules) RegisterAction_IndexMysqlSetup() *Action {
 			return
 		}
 		defer db.Close()
-		err = db.Ping()
+		err = db.Ping(wrap.R.Context())
 		if err != nil {
 			wrap.MsgError(err.Error())
 			return

+ 2 - 2
modules/modules.go

@@ -297,7 +297,7 @@ func (this *Modules) XXXActionFire(wrap *wrapper.Wrapper) bool {
 			if name != "" {
 				if act, ok := this.acts[name]; ok {
 					if act.Info.WantDB {
-						err := wrap.UseDatabase()
+						err := wrap.UseDatabase(wrap.R.Context())
 						if err != nil {
 							this.XXXActionHeaders(wrap, http.StatusNotFound)
 							wrap.MsgError(err.Error())
@@ -345,7 +345,7 @@ func (this *Modules) XXXFrontEnd(wrap *wrapper.Wrapper) bool {
 		wrap.CurrModule = cm
 		if mod.Front != nil {
 			if mod.Info.WantDB {
-				err := wrap.UseDatabase()
+				err := wrap.UseDatabase(wrap.R.Context())
 				if err != nil {
 					utils.SystemErrorPageEngine(wrap.W, err)
 					return true

+ 58 - 68
smtp.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"fmt"
 	"html"
 	"io/ioutil"
@@ -12,13 +13,64 @@ import (
 	"golang-fave/engine/sqlw"
 	"golang-fave/engine/wrapper/config"
 	"golang-fave/utils"
+
+	"github.com/vladimirok5959/golang-worker/worker"
 )
 
-func smtp_send(host, port, user, pass, subject, msg string, receivers []string) error {
-	return utils.SMTPSend(host, port, user, pass, subject, msg, receivers)
+func smtp_sender(www_dir string, mp *mysqlpool.MySqlPool) *worker.Worker {
+	return worker.New(func(ctx context.Context, w *worker.Worker, o *[]worker.Iface) {
+		if www_dir, ok := (*o)[0].(string); ok {
+			if mp, ok := (*o)[1].(*mysqlpool.MySqlPool); ok {
+				smtp_loop(ctx, www_dir, mp)
+			}
+		}
+		select {
+		case <-ctx.Done():
+		case <-time.After(5 * time.Second):
+			return
+		}
+	}, &[]worker.Iface{
+		www_dir,
+		mp,
+	})
+}
+
+func smtp_loop(ctx context.Context, www_dir string, mp *mysqlpool.MySqlPool) {
+	dirs, err := ioutil.ReadDir(www_dir)
+	if err == nil {
+		for _, dir := range dirs {
+			select {
+			case <-ctx.Done():
+				return
+			default:
+				if mp != nil {
+					target_dir := strings.Join([]string{www_dir, dir.Name()}, string(os.PathSeparator))
+					if utils.IsDirExists(target_dir) {
+						smtp_process(ctx, target_dir, dir.Name(), mp)
+					}
+				}
+			}
+		}
+	}
 }
 
-func smtp_prepare(db *sqlw.DB, conf *config.Config) {
+func smtp_process(ctx context.Context, dir, host string, mp *mysqlpool.MySqlPool) {
+	db := mp.Get(host)
+	if db != nil {
+		conf := config.ConfigNew()
+		if err := conf.ConfigRead(strings.Join([]string{dir, "config", "config.json"}, string(os.PathSeparator))); err == nil {
+			if !((*conf).SMTP.Host == "" || (*conf).SMTP.Login == "" && (*conf).SMTP.Password == "") {
+				if err := db.Ping(ctx); err == nil {
+					smtp_prepare(ctx, db, conf)
+				}
+			}
+		} else {
+			fmt.Printf("Smtp error (config): %v\n", err)
+		}
+	}
+}
+
+func smtp_prepare(ctx context.Context, db *sqlw.DB, conf *config.Config) {
 	rows, err := db.Query(
 		`SELECT
 			id,
@@ -49,6 +101,7 @@ func smtp_prepare(db *sqlw.DB, conf *config.Config) {
 				); err == nil {
 					go func(db *sqlw.DB, conf *config.Config, id int, subject, msg string, receivers []string) {
 						if err := smtp_send(
+							ctx,
 							(*conf).SMTP.Host,
 							utils.IntToStr((*conf).SMTP.Port),
 							(*conf).SMTP.Login,
@@ -88,69 +141,6 @@ func smtp_prepare(db *sqlw.DB, conf *config.Config) {
 	}
 }
 
-func smtp_process(dir, host string, mp *mysqlpool.MySqlPool) {
-	db := mp.Get(host)
-	if db != nil {
-		conf := config.ConfigNew()
-		if err := conf.ConfigRead(strings.Join([]string{dir, "config", "config.json"}, string(os.PathSeparator))); err == nil {
-			if !((*conf).SMTP.Host == "" || (*conf).SMTP.Login == "" && (*conf).SMTP.Password == "") {
-				if err := db.Ping(); err == nil {
-					smtp_prepare(db, conf)
-				}
-			}
-		} else {
-			fmt.Printf("Smtp error (config): %v\n", err)
-		}
-	}
-}
-
-func smtp_loop(www_dir string, stop chan bool, mp *mysqlpool.MySqlPool) {
-	dirs, err := ioutil.ReadDir(www_dir)
-	if err == nil {
-		for _, dir := range dirs {
-			select {
-			case <-stop:
-				break
-			default:
-				if mp != nil {
-					target_dir := strings.Join([]string{www_dir, dir.Name()}, string(os.PathSeparator))
-					if utils.IsDirExists(target_dir) {
-						smtp_process(target_dir, dir.Name(), mp)
-					}
-				}
-			}
-		}
-	}
-}
-
-func smtp_start(www_dir string, mp *mysqlpool.MySqlPool) (chan bool, chan bool) {
-	ch := make(chan bool)
-	stop := make(chan bool)
-	go func() {
-		for {
-			select {
-			case <-time.After(5 * time.Second):
-				// Run every 5 seconds
-				smtp_loop(www_dir, stop, mp)
-			case <-ch:
-				ch <- true
-				return
-			}
-		}
-	}()
-	return ch, stop
-}
-
-func smtp_stop(ch, stop chan bool) {
-	for {
-		select {
-		case stop <- true:
-		case ch <- true:
-			<-ch
-			return
-		case <-time.After(3 * time.Second):
-			fmt.Println("Smtp error: force exit by timeout after 3 seconds")
-			return
-		}
-	}
+func smtp_send(ctx context.Context, host, port, user, pass, subject, msg string, receivers []string) error {
+	return utils.SMTPSend(host, port, user, pass, subject, msg, receivers)
 }

+ 5 - 4
support/support.go

@@ -1,6 +1,7 @@
 package support
 
 import (
+	"context"
 	"io/ioutil"
 	"os"
 	"regexp"
@@ -31,14 +32,14 @@ func (this *Support) isSettingsTableDoesntExist(err error) bool {
 	return false
 }
 
-func (this *Support) Migration(dir string) error {
+func (this *Support) Migration(ctx context.Context, dir string) error {
 	files, err := ioutil.ReadDir(dir)
 	if err != nil {
 		return err
 	}
 	for _, file := range files {
 		if utils.IsDir(dir + string(os.PathSeparator) + file.Name()) {
-			if err := this.Migrate(dir + string(os.PathSeparator) + file.Name()); err != nil {
+			if err := this.Migrate(ctx, dir+string(os.PathSeparator)+file.Name()); err != nil {
 				return err
 			}
 		}
@@ -46,7 +47,7 @@ func (this *Support) Migration(dir string) error {
 	return nil
 }
 
-func (this *Support) Migrate(host string) error {
+func (this *Support) Migrate(ctx context.Context, host string) error {
 	mysql_config_file := host + string(os.PathSeparator) + "config" + string(os.PathSeparator) + "mysql.json"
 	if utils.IsMySqlConfigExists(mysql_config_file) {
 		mc, err := utils.MySqlConfigRead(mysql_config_file)
@@ -57,7 +58,7 @@ func (this *Support) Migrate(host string) error {
 		if err != nil {
 			return err
 		}
-		if err := db.Ping(); err != nil {
+		if err := db.Ping(ctx); err != nil {
 			return err
 		}
 		defer db.Close()

+ 1 - 1
xml.go

@@ -59,7 +59,7 @@ func xml_detect(ctx context.Context, dir, host string, mp *mysqlpool.MySqlPool)
 	if db != nil {
 		trigger := strings.Join([]string{dir, "tmp", "trigger.xml.run"}, string(os.PathSeparator))
 		if utils.IsFileExists(trigger) {
-			if err := db.Ping(); err == nil {
+			if err := db.Ping(ctx); err == nil {
 				xml_create(ctx, dir, host, trigger, db)
 			}
 		}