tx.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package common
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "os"
  7. "time"
  8. )
  9. type Tx struct {
  10. tx *sql.Tx
  11. Debug bool
  12. Driver string
  13. start time.Time
  14. }
  15. func (t *Tx) fixQuery(query string) string {
  16. if t.Driver == "mysql" {
  17. return fixQuery(query)
  18. }
  19. return query
  20. }
  21. func (t *Tx) log(fname string, start time.Time, err error, tx bool, query string, args ...any) {
  22. if t.Debug {
  23. log(os.Stdout, fname, start, err, tx, query, args...)
  24. }
  25. }
  26. func (t *Tx) Commit() error {
  27. err := t.tx.Commit()
  28. t.log("Commit", t.start, err, true, "")
  29. return err
  30. }
  31. func (t *Tx) CurrentUnixTimestamp() int64 {
  32. return currentUnixTimestamp()
  33. }
  34. func (t *Tx) DeleteRowByID(ctx context.Context, id int64, row any) error {
  35. query := deleteRowByIDString(row)
  36. _, err := t.Exec(ctx, query, id)
  37. return err
  38. }
  39. func (t *Tx) Each(ctx context.Context, query string, callback func(ctx context.Context, rows *Rows) error, args ...any) error {
  40. if callback == nil {
  41. return fmt.Errorf("callback is not set")
  42. }
  43. rows, err := t.Query(ctx, query, args...)
  44. if err != nil {
  45. return err
  46. }
  47. defer rows.Close()
  48. for rows.Next() {
  49. select {
  50. case <-ctx.Done():
  51. return ctx.Err()
  52. default:
  53. if err := callback(ctx, rows); err != nil {
  54. return err
  55. }
  56. }
  57. }
  58. if err := rows.Err(); err != nil {
  59. return err
  60. }
  61. return nil
  62. }
  63. func (t *Tx) EachPrepared(ctx context.Context, prep *Prepared, callback func(ctx context.Context, rows *Rows) error) error {
  64. return t.Each(ctx, prep.Query, callback, prep.Args...)
  65. }
  66. func (t *Tx) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) {
  67. start := time.Now()
  68. res, err := t.tx.ExecContext(ctx, t.fixQuery(query), args...)
  69. t.log("Exec", start, err, true, t.fixQuery(query), args...)
  70. return res, err
  71. }
  72. func (t *Tx) ExecPrepared(ctx context.Context, prep *Prepared) (sql.Result, error) {
  73. return t.Exec(ctx, prep.Query, prep.Args...)
  74. }
  75. func (t *Tx) InsertRow(ctx context.Context, row any) error {
  76. query, args := insertRowString(row)
  77. _, err := t.Exec(ctx, query, args...)
  78. return err
  79. }
  80. func (t *Tx) PrepareSQL(query string, args ...any) *Prepared {
  81. return prepareSQL(query, args...)
  82. }
  83. func (t *Tx) Query(ctx context.Context, query string, args ...any) (*Rows, error) {
  84. start := time.Now()
  85. rows, err := t.tx.QueryContext(ctx, t.fixQuery(query), args...)
  86. t.log("Query", start, err, true, t.fixQuery(query), args...)
  87. return &Rows{Rows: rows}, err
  88. }
  89. func (t *Tx) QueryPrepared(ctx context.Context, prep *Prepared) (*Rows, error) {
  90. return t.Query(ctx, prep.Query, prep.Args...)
  91. }
  92. func (t *Tx) QueryRow(ctx context.Context, query string, args ...any) *Row {
  93. start := time.Now()
  94. row := t.tx.QueryRowContext(ctx, t.fixQuery(query), args...)
  95. t.log("QueryRow", start, nil, true, t.fixQuery(query), args...)
  96. return &Row{Row: row}
  97. }
  98. func (t *Tx) QueryRowByID(ctx context.Context, id int64, row any) error {
  99. query := queryRowByIDString(row)
  100. return t.QueryRow(ctx, query, id).Scans(row)
  101. }
  102. func (t *Tx) QueryRowPrepared(ctx context.Context, prep *Prepared) *Row {
  103. return t.QueryRow(ctx, prep.Query, prep.Args...)
  104. }
  105. func (t *Tx) RowExists(ctx context.Context, id int64, row any) bool {
  106. var exists int
  107. query := rowExistsString(row)
  108. if err := t.QueryRow(ctx, query, id).Scan(&exists); err == nil && exists == 1 {
  109. return true
  110. }
  111. return false
  112. }
  113. func (t *Tx) Rollback() error {
  114. err := t.tx.Rollback()
  115. t.log("Rollback", t.start, err, true, "")
  116. return err
  117. }
  118. func (t *Tx) UpdateRow(ctx context.Context, row any) error {
  119. query, args := updateRowString(row)
  120. _, err := t.Exec(ctx, query, args...)
  121. return err
  122. }
  123. func (t *Tx) UpdateRowOnly(ctx context.Context, row any, fields ...string) error {
  124. query, args := updateRowString(row, fields...)
  125. _, err := t.Exec(ctx, query, args...)
  126. return err
  127. }