@ -7,8 +7,10 @@ package git
import (
import (
"context"
"context"
"crypto/sha1"
"crypto/sha1"
"errors"
"fmt"
"fmt"
"net/url"
"net/url"
"strconv"
"strings"
"strings"
"time"
"time"
@ -49,79 +51,67 @@ func init() {
db . RegisterModel ( new ( CommitStatusIndex ) )
db . RegisterModel ( new ( CommitStatusIndex ) )
}
}
// upsertCommitStatusIndex the function will not return until it acquires the lock or receives an error.
func postgresGetCommitStatusIndex ( ctx context . Context , repoID int64 , sha string ) ( int64 , error ) {
func upsertCommitStatusIndex ( ctx context . Context , repoID int64 , sha string ) ( err error ) {
res , err := db . GetEngine ( ctx ) . Query ( "INSERT INTO `commit_status_index` (repo_id, sha, max_index) " +
// An atomic UPSERT operation (INSERT/UPDATE) is the only operation
"VALUES (?,?,1) ON CONFLICT (repo_id, sha) DO UPDATE SET max_index = `commit_status_index`.max_index+1 RETURNING max_index" ,
// that ensures that the key is actually locked.
repoID , sha )
switch {
if err != nil {
case setting . Database . UseSQLite3 || setting . Database . UsePostgreSQL :
return 0 , err
_ , err = db . Exec ( ctx , "INSERT INTO `commit_status_index` (repo_id, sha, max_index) " +
}
"VALUES (?,?,1) ON CONFLICT (repo_id,sha) DO UPDATE SET max_index = `commit_status_index`.max_index+1" ,
if len ( res ) == 0 {
repoID , sha )
return 0 , db . ErrGetResourceIndexFailed
case setting . Database . UseMySQL :
_ , err = db . Exec ( ctx , "INSERT INTO `commit_status_index` (repo_id, sha, max_index) " +
"VALUES (?,?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1" ,
repoID , sha )
case setting . Database . UseMSSQL :
// https://weblogs.sqlteam.com/dang/2009/01/31/upsert-race-condition-with-merge/
_ , err = db . Exec ( ctx , "MERGE `commit_status_index` WITH (HOLDLOCK) as target " +
"USING (SELECT ? AS repo_id, ? AS sha) AS src " +
"ON src.repo_id = target.repo_id AND src.sha = target.sha " +
"WHEN MATCHED THEN UPDATE SET target.max_index = target.max_index+1 " +
"WHEN NOT MATCHED THEN INSERT (repo_id, sha, max_index) " +
"VALUES (src.repo_id, src.sha, 1);" ,
repoID , sha )
default :
return fmt . Errorf ( "database type not supported" )
}
}
return err
return strconv . ParseInt ( string ( res [ 0 ] [ "max_index" ] ) , 10 , 64 )
}
}
// GetNextCommitStatusIndex retried 3 times to generate a resource index
// GetNextCommitStatusIndex retried 3 times to generate a resource index
func GetNextCommitStatusIndex ( repoID int64 , sha string ) ( int64 , error ) {
func GetNextCommitStatusIndex ( ctx context . Context , repoID int64 , sha string ) ( int64 , error ) {
for i := 0 ; i < db . MaxDupIndexAttempts ; i ++ {
if setting . Database . UsePostgreSQL {
idx , err := getNextCommitStatusIndex ( repoID , sha )
return postgresGetCommitStatusIndex ( ctx , repoID , sha )
if err == db . ErrResouceOutdated {
continue
}
if err != nil {
return 0 , err
}
return idx , nil
}
}
return 0 , db . ErrGetResourceIndexFailed
}
// getNextCommitStatusIndex return the next index
e := db . GetEngine ( ctx )
func getNextCommitStatusIndex ( repoID int64 , sha string ) ( int64 , error ) {
ctx , commiter , err := db . TxContext ( )
// try to update the max_index to next value, and acquire the write-lock for the record
res , err := e . Exec ( "UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?" , repoID , sha )
if err != nil {
if err != nil {
return 0 , err
return 0 , err
}
}
defer commiter . Close ( )
affected , err := res . RowsAffected ( )
var preIdx int64
_ , err = db . GetEngine ( ctx ) . SQL ( "SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ?" , repoID , sha ) . Get ( & preIdx )
if err != nil {
if err != nil {
return 0 , err
return 0 , err
}
}
if affected == 0 {
// this slow path is only for the first time of creating a resource index
_ , errIns := e . Exec ( "INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)" , repoID , sha )
res , err = e . Exec ( "UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?" , repoID , sha )
if err != nil {
return 0 , err
}
if err := upsertCommitStatusIndex ( ctx , repoID , sha ) ; err != nil {
affected , err = res . RowsAffected ( )
return 0 , err
if err != nil {
return 0 , err
}
// if the update still can not update any records, the record must not exist and there must be some errors (insert error)
if affected == 0 {
if errIns == nil {
return 0 , errors . New ( "impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated" )
}
return 0 , errIns
}
}
}
var curIdx int64
// now, the new index is in database (protected by the transaction and write-lock)
has , err := db . GetEngine ( ctx ) . SQL ( "SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ? AND max_index=?" , repoID , sha , preIdx + 1 ) . Get ( & curIdx )
var newIdx int64
has , err := e . SQL ( "SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?" , repoID , sha ) . Get ( & newIdx )
if err != nil {
if err != nil {
return 0 , err
return 0 , err
}
}
if ! has {
if ! has {
return 0 , db . ErrResouceOutdated
return 0 , errors . New ( "impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected" )
}
if err := commiter . Commit ( ) ; err != nil {
return 0 , err
}
}
return cur Idx, nil
return newIdx , nil
}
}
func ( status * CommitStatus ) loadAttributes ( ctx context . Context ) ( err error ) {
func ( status * CommitStatus ) loadAttributes ( ctx context . Context ) ( err error ) {
@ -291,18 +281,18 @@ func NewCommitStatus(opts NewCommitStatusOptions) error {
return fmt . Errorf ( "NewCommitStatus[%s, %s]: no user specified" , repoPath , opts . SHA )
return fmt . Errorf ( "NewCommitStatus[%s, %s]: no user specified" , repoPath , opts . SHA )
}
}
// Get the next Status Index
idx , err := GetNextCommitStatusIndex ( opts . Repo . ID , opts . SHA )
if err != nil {
return fmt . Errorf ( "generate commit status index failed: %w" , err )
}
ctx , committer , err := db . TxContext ( )
ctx , committer , err := db . TxContext ( )
if err != nil {
if err != nil {
return fmt . Errorf ( "NewCommitStatus[repo_id: %d, user_id: %d, sha: %s]: %w" , opts . Repo . ID , opts . Creator . ID , opts . SHA , err )
return fmt . Errorf ( "NewCommitStatus[repo_id: %d, user_id: %d, sha: %s]: %w" , opts . Repo . ID , opts . Creator . ID , opts . SHA , err )
}
}
defer committer . Close ( )
defer committer . Close ( )
// Get the next Status Index
idx , err := GetNextCommitStatusIndex ( ctx , opts . Repo . ID , opts . SHA )
if err != nil {
return fmt . Errorf ( "generate commit status index failed: %w" , err )
}
opts . CommitStatus . Description = strings . TrimSpace ( opts . CommitStatus . Description )
opts . CommitStatus . Description = strings . TrimSpace ( opts . CommitStatus . Description )
opts . CommitStatus . Context = strings . TrimSpace ( opts . CommitStatus . Context )
opts . CommitStatus . Context = strings . TrimSpace ( opts . CommitStatus . Context )
opts . CommitStatus . TargetURL = strings . TrimSpace ( opts . CommitStatus . TargetURL )
opts . CommitStatus . TargetURL = strings . TrimSpace ( opts . CommitStatus . TargetURL )
@ -316,7 +306,7 @@ func NewCommitStatus(opts NewCommitStatusOptions) error {
// Insert new CommitStatus
// Insert new CommitStatus
if _ , err = db . GetEngine ( ctx ) . Insert ( opts . CommitStatus ) ; err != nil {
if _ , err = db . GetEngine ( ctx ) . Insert ( opts . CommitStatus ) ; err != nil {
return fmt . Errorf ( "I nsert CommitStatus[%s, %s]: %w" , repoPath , opts . SHA , err )
return fmt . Errorf ( "i nsert CommitStatus[%s, %s]: %w" , repoPath , opts . SHA , err )
}
}
return committer . Commit ( )
return committer . Commit ( )