connection_go18.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. // Go MySQL Driver - A MySQL-Driver for Go's database/sql package
  2. //
  3. // Copyright 2012 The Go-MySQL-Driver Authors. All rights reserved.
  4. //
  5. // This Source Code Form is subject to the terms of the Mozilla Public
  6. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  7. // You can obtain one at http://mozilla.org/MPL/2.0/.
  8. // +build go1.8
  9. package mysql
  10. import (
  11. "context"
  12. "database/sql"
  13. "database/sql/driver"
  14. )
  15. // Ping implements driver.Pinger interface
  16. func (mc *mysqlConn) Ping(ctx context.Context) (err error) {
  17. if mc.closed.IsSet() {
  18. errLog.Print(ErrInvalidConn)
  19. return driver.ErrBadConn
  20. }
  21. if err = mc.watchCancel(ctx); err != nil {
  22. return
  23. }
  24. defer mc.finish()
  25. if err = mc.writeCommandPacket(comPing); err != nil {
  26. return
  27. }
  28. return mc.readResultOK()
  29. }
  30. // BeginTx implements driver.ConnBeginTx interface
  31. func (mc *mysqlConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
  32. if err := mc.watchCancel(ctx); err != nil {
  33. return nil, err
  34. }
  35. defer mc.finish()
  36. if sql.IsolationLevel(opts.Isolation) != sql.LevelDefault {
  37. level, err := mapIsolationLevel(opts.Isolation)
  38. if err != nil {
  39. return nil, err
  40. }
  41. err = mc.exec("SET TRANSACTION ISOLATION LEVEL " + level)
  42. if err != nil {
  43. return nil, err
  44. }
  45. }
  46. return mc.begin(opts.ReadOnly)
  47. }
  48. func (mc *mysqlConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
  49. dargs, err := namedValueToValue(args)
  50. if err != nil {
  51. return nil, err
  52. }
  53. if err := mc.watchCancel(ctx); err != nil {
  54. return nil, err
  55. }
  56. rows, err := mc.query(query, dargs)
  57. if err != nil {
  58. mc.finish()
  59. return nil, err
  60. }
  61. rows.finish = mc.finish
  62. return rows, err
  63. }
  64. func (mc *mysqlConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
  65. dargs, err := namedValueToValue(args)
  66. if err != nil {
  67. return nil, err
  68. }
  69. if err := mc.watchCancel(ctx); err != nil {
  70. return nil, err
  71. }
  72. defer mc.finish()
  73. return mc.Exec(query, dargs)
  74. }
  75. func (mc *mysqlConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
  76. if err := mc.watchCancel(ctx); err != nil {
  77. return nil, err
  78. }
  79. stmt, err := mc.Prepare(query)
  80. mc.finish()
  81. if err != nil {
  82. return nil, err
  83. }
  84. select {
  85. default:
  86. case <-ctx.Done():
  87. stmt.Close()
  88. return nil, ctx.Err()
  89. }
  90. return stmt, nil
  91. }
  92. func (stmt *mysqlStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
  93. dargs, err := namedValueToValue(args)
  94. if err != nil {
  95. return nil, err
  96. }
  97. if err := stmt.mc.watchCancel(ctx); err != nil {
  98. return nil, err
  99. }
  100. rows, err := stmt.query(dargs)
  101. if err != nil {
  102. stmt.mc.finish()
  103. return nil, err
  104. }
  105. rows.finish = stmt.mc.finish
  106. return rows, err
  107. }
  108. func (stmt *mysqlStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
  109. dargs, err := namedValueToValue(args)
  110. if err != nil {
  111. return nil, err
  112. }
  113. if err := stmt.mc.watchCancel(ctx); err != nil {
  114. return nil, err
  115. }
  116. defer stmt.mc.finish()
  117. return stmt.Exec(dargs)
  118. }
  119. func (mc *mysqlConn) watchCancel(ctx context.Context) error {
  120. if mc.watching {
  121. // Reach here if canceled,
  122. // so the connection is already invalid
  123. mc.cleanup()
  124. return nil
  125. }
  126. // When ctx is already cancelled, don't watch it.
  127. if err := ctx.Err(); err != nil {
  128. return err
  129. }
  130. // When ctx is not cancellable, don't watch it.
  131. if ctx.Done() == nil {
  132. return nil
  133. }
  134. // When watcher is not alive, can't watch it.
  135. if mc.watcher == nil {
  136. return nil
  137. }
  138. mc.watching = true
  139. mc.watcher <- ctx
  140. return nil
  141. }
  142. func (mc *mysqlConn) startWatcher() {
  143. watcher := make(chan mysqlContext, 1)
  144. mc.watcher = watcher
  145. finished := make(chan struct{})
  146. mc.finished = finished
  147. go func() {
  148. for {
  149. var ctx mysqlContext
  150. select {
  151. case ctx = <-watcher:
  152. case <-mc.closech:
  153. return
  154. }
  155. select {
  156. case <-ctx.Done():
  157. mc.cancel(ctx.Err())
  158. case <-finished:
  159. case <-mc.closech:
  160. return
  161. }
  162. }
  163. }()
  164. }
  165. func (mc *mysqlConn) CheckNamedValue(nv *driver.NamedValue) (err error) {
  166. nv.Value, err = converter{}.ConvertValue(nv.Value)
  167. return
  168. }
  169. // ResetSession implements driver.SessionResetter.
  170. // (From Go 1.10)
  171. func (mc *mysqlConn) ResetSession(ctx context.Context) error {
  172. if mc.closed.IsSet() {
  173. return driver.ErrBadConn
  174. }
  175. return nil
  176. }