@ -6,22 +6,20 @@
package archiver
package archiver
import (
import (
"errors"
"fmt"
"io"
"io"
"io/ioutil"
"os"
"os"
"path"
"regexp"
"regexp"
"strings"
"strings"
"sync"
"time"
"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util "
"code.gitea.io/gitea/modules/storage "
)
)
// ArchiveRequest defines the parameters of an archive request, which notably
// ArchiveRequest defines the parameters of an archive request, which notably
@ -30,223 +28,174 @@ import (
// This is entirely opaque to external entities, though, and mostly used as a
// This is entirely opaque to external entities, though, and mostly used as a
// handle elsewhere.
// handle elsewhere.
type ArchiveRequest struct {
type ArchiveRequest struct {
uri string
RepoID int64
repo * git . Repository
refName string
refName string
ext string
Type git . ArchiveType
archivePath string
CommitID string
archiveType git . ArchiveType
archiveComplete bool
commit * git . Commit
cchan chan struct { }
}
}
var archiveInProgress [ ] * ArchiveRequest
var archiveMutex sync . Mutex
// SHA1 hashes will only go up to 40 characters, but SHA256 hashes will go all
// SHA1 hashes will only go up to 40 characters, but SHA256 hashes will go all
// the way to 64.
// the way to 64.
var shaRegex = regexp . MustCompile ( ` ^[0-9a-f] { 4,64}$ ` )
var shaRegex = regexp . MustCompile ( ` ^[0-9a-f] { 4,64}$ ` )
// These facilitate testing, by allowing the unit tests to control (to some extent)
// NewRequest creates an archival request, based on the URI. The
// the goroutine used for processing the queue.
// resulting ArchiveRequest is suitable for being passed to ArchiveRepository()
var archiveQueueMutex * sync . Mutex
// if it's determined that the request still needs to be satisfied.
var archiveQueueStartCond * sync . Cond
func NewRequest ( repoID int64 , repo * git . Repository , uri string ) ( * ArchiveRequest , error ) {
var archiveQueueReleaseCond * sync . Cond
r := & ArchiveRequest {
RepoID : repoID ,
// GetArchivePath returns the path from which we can serve this archive.
func ( aReq * ArchiveRequest ) GetArchivePath ( ) string {
return aReq . archivePath
}
// GetArchiveName returns the name of the caller, based on the ref used by the
// caller to create this request.
func ( aReq * ArchiveRequest ) GetArchiveName ( ) string {
return aReq . refName + aReq . ext
}
// IsComplete returns the completion status of this request.
func ( aReq * ArchiveRequest ) IsComplete ( ) bool {
return aReq . archiveComplete
}
}
// WaitForCompletion will wait for this request to complete, with no timeout.
var ext string
// It returns whether the archive was actually completed, as the channel could
switch {
// have also been closed due to an error.
case strings . HasSuffix ( uri , ".zip" ) :
func ( aReq * ArchiveRequest ) WaitForCompletion ( ctx * context . Context ) bool {
ext = ".zip"
select {
r . Type = git . ZIP
case <- aReq . cchan :
case strings . HasSuffix ( uri , ".tar.gz" ) :
case <- ctx . Done ( ) :
ext = ".tar.gz"
r . Type = git . TARGZ
default :
return nil , fmt . Errorf ( "Unknown format: %s" , uri )
}
}
return aReq . IsComplete ( )
r . refName = strings . TrimSuffix ( uri , ext )
}
// TimedWaitForCompletion will wait for this request to complete, with timeout
var err error
// happening after the specified Duration. It returns whether the archive is
// Get corresponding commit.
// now complete and whether we hit the timeout or not. The latter may not be
if repo . IsBranchExist ( r . refName ) {
// useful if the request is complete or we started to shutdown.
r . CommitID , err = repo . GetBranchCommitID ( r . refName )
func ( aReq * ArchiveRequest ) TimedWaitForCompletion ( ctx * context . Context , dur time . Duration ) ( bool , bool ) {
if err != nil {
timeout := false
return nil , err
select {
case <- time . After ( dur ) :
timeout = true
case <- aReq . cchan :
case <- ctx . Done ( ) :
}
}
} else if repo . IsTagExist ( r . refName ) {
return aReq . IsComplete ( ) , timeout
r . CommitID , err = repo . GetTagCommitID ( r . refName )
if err != nil {
return nil , err
}
}
} else if shaRegex . MatchString ( r . refName ) {
// The caller must hold the archiveMutex across calls to getArchiveRequest.
if repo . IsCommitExist ( r . refName ) {
func getArchiveRequest ( repo * git . Repository , commit * git . Commit , archiveType git . ArchiveType ) * ArchiveRequest {
r . CommitID = r . refName
for _ , r := range archiveInProgress {
} else {
// Need to be referring to the same repository.
return nil , git . ErrNotExist {
if r . repo . Path == repo . Path && r . commit . ID == commit . ID && r . archiveType == archiveType {
ID : r . refName ,
return r
}
}
}
}
return nil
} else {
return nil , fmt . Errorf ( "Unknow ref %s type" , r . refName )
}
}
// DeriveRequestFrom creates an archival request, based on the URI. The
return r , nil
// resulting ArchiveRequest is suitable for being passed to ArchiveRepository()
// if it's determined that the request still needs to be satisfied.
func DeriveRequestFrom ( ctx * context . Context , uri string ) * ArchiveRequest {
if ctx . Repo == nil || ctx . Repo . GitRepo == nil {
log . Trace ( "Repo not initialized" )
return nil
}
}
r := & ArchiveRequest {
uri : uri ,
// GetArchiveName returns the name of the caller, based on the ref used by the
repo : ctx . Repo . GitRepo ,
// caller to create this request.
func ( aReq * ArchiveRequest ) GetArchiveName ( ) string {
return strings . ReplaceAll ( aReq . refName , "/" , "-" ) + "." + aReq . Type . String ( )
}
}
switch {
func doArchive ( r * ArchiveRequest ) ( * models . RepoArchiver , error ) {
case strings . HasSuffix ( uri , ".zip" ) :
ctx , commiter , err := models . TxDBContext ( )
r . ext = ".zip"
if err != nil {
r . archivePath = path . Join ( r . repo . Path , "archives/zip" )
return nil , err
r . archiveType = git . ZIP
case strings . HasSuffix ( uri , ".tar.gz" ) :
r . ext = ".tar.gz"
r . archivePath = path . Join ( r . repo . Path , "archives/targz" )
r . archiveType = git . TARGZ
default :
log . Trace ( "Unknown format: %s" , uri )
return nil
}
}
defer commiter . Close ( )
r . refName = strings . TrimSuffix ( r . uri , r . ext )
archiver , err := models . GetRepoArchiver ( ctx , r . RepoID , r . Type , r . CommitID )
isDir , err := util . IsDir ( r . archivePath )
if err != nil {
if err != nil {
ctx . ServerError ( "Download -> util.IsDir(archivePath)" , err )
return nil , err
return nil
}
}
if ! isDir {
if err := os . MkdirAll ( r . archivePath , os . ModePerm ) ; err != nil {
if archiver != nil {
ctx . ServerError ( "Download -> os.MkdirAll(archivePath)" , err )
// FIXME: If another process are generating it, we think it's not ready and just return
return nil
// Or we should wait until the archive generated.
if archiver . Status == models . RepoArchiverGenerating {
return nil , nil
}
} else {
archiver = & models . RepoArchiver {
RepoID : r . RepoID ,
Type : r . Type ,
CommitID : r . CommitID ,
Status : models . RepoArchiverGenerating ,
}
if err := models . AddRepoArchiver ( ctx , archiver ) ; err != nil {
return nil , err
}
}
}
}
// Get corresponding commit.
rPath , err := archiver . RelativePath ( )
if r . repo . IsBranchExist ( r . refName ) {
r . commit , err = r . repo . GetBranchCommit ( r . refName )
if err != nil {
if err != nil {
ctx . ServerError ( "GetBranchCommit" , err )
return nil , err
return nil
}
}
} else if r . repo . IsTagExist ( r . refName ) {
r . commit , err = r . repo . GetTagCommit ( r . refName )
_ , err = storage . RepoArchives . Stat ( rPath )
if err != nil {
if err == nil {
ctx . ServerError ( "GetTagCommit" , err )
if archiver . Status == models . RepoArchiverGenerating {
return nil
archiver . Status = models . RepoArchiverReady
return archiver , models . UpdateRepoArchiverStatus ( ctx , archiver )
}
}
} else if shaRegex . MatchString ( r . refName ) {
return archiver , nil
r . commit , err = r . repo . GetCommit ( r . refName )
if err != nil {
ctx . NotFound ( "GetCommit" , nil )
return nil
}
}
} else {
ctx . NotFound ( "DeriveRequestFrom" , nil )
if ! errors . Is ( err , os . ErrNotExist ) {
return nil
return nil , fmt . Errorf ( "unable to stat archive: %v" , err )
}
}
archiveMutex . Lock ( )
rd , w := io . Pipe ( )
defer archiveMutex . Unlock ( )
defer func ( ) {
if rExisting := getArchiveRequest ( r . repo , r . commit , r . archiveType ) ; rExisting != nil {
w . Close ( )
return rExisting
rd . Close ( )
} ( )
var done = make ( chan error )
repo , err := archiver . LoadRepo ( )
if err != nil {
return nil , fmt . Errorf ( "archiver.LoadRepo failed: %v" , err )
}
}
r . archivePath = path . Join ( r . archivePath , base . ShortSha ( r . commit . ID . String ( ) ) + r . ext )
gitRepo , err := git . OpenRepository ( repo . RepoPath ( ) )
r . archiveComplete , err = util . IsFile ( r . archivePath )
if err != nil {
if err != nil {
ctx . ServerError ( "util.IsFile" , err )
return nil , err
return nil
}
}
return r
defer gitRepo . Close ( )
go func ( done chan error , w * io . PipeWriter , archiver * models . RepoArchiver , gitRepo * git . Repository ) {
defer func ( ) {
if r := recover ( ) ; r != nil {
done <- fmt . Errorf ( "%v" , r )
}
}
} ( )
func doArchive ( r * ArchiveRequest ) {
err = gitRepo . CreateArchive (
var (
graceful . GetManager ( ) . ShutdownContext ( ) ,
err error
archiver . Type ,
tmpArchive * os . File
w ,
destArchive * os . File
setting . Repository . PrefixArchiveFiles ,
archiver . CommitID ,
)
)
_ = w . CloseWithError ( err )
done <- err
} ( done , w , archiver , gitRepo )
// Close the channel to indicate to potential waiters that this request
// TODO: add lfs data to zip
// has finished.
// TODO: add submodule data to zip
defer close ( r . cchan )
// It could have happened that we enqueued two archival requests, due to
if _ , err := storage . RepoArchives . Save ( rPath , rd , - 1 ) ; err != nil {
// race conditions and difficulties in locking. Do one last check that
return nil , fmt . Errorf ( "unable to write archive: %v" , err )
// the archive we're referring to doesn't already exist. If it does exist,
// then just mark the request as complete and move on.
isFile , err := util . IsFile ( r . archivePath )
if err != nil {
log . Error ( "Unable to check if %s util.IsFile: %v. Will ignore and recreate." , r . archivePath , err )
}
if isFile {
r . archiveComplete = true
return
}
}
// Create a temporary file to use while the archive is being built. We
err = <- done
// will then copy it into place (r.archivePath) once it's fully
// constructed.
tmpArchive , err = ioutil . TempFile ( "" , "archive" )
if err != nil {
if err != nil {
log . Error ( "Unable to create a temporary archive file! Error: %v" , err )
return nil , err
return
}
defer func ( ) {
tmpArchive . Close ( )
os . Remove ( tmpArchive . Name ( ) )
} ( )
if err = r . commit . CreateArchive ( graceful . GetManager ( ) . ShutdownContext ( ) , tmpArchive . Name ( ) , git . CreateArchiveOpts {
Format : r . archiveType ,
Prefix : setting . Repository . PrefixArchiveFiles ,
} ) ; err != nil {
log . Error ( "Download -> CreateArchive " + tmpArchive . Name ( ) , err )
return
}
}
// Now we copy it into place
if archiver . Status == models . RepoArchiverGenerating {
if destArchive , err = os . Create ( r . archivePath ) ; err != nil {
archiver . Status = models . RepoArchiverReady
log . Error ( "Unable to open archive " + r . archivePath )
if err = models . UpdateRepoArchiverStatus ( ctx , archiver ) ; err != nil {
return
return nil , err
}
}
_ , err = io . Copy ( destArchive , tmpArchive )
destArchive . Close ( )
if err != nil {
log . Error ( "Unable to write archive " + r . archivePath )
return
}
}
// Block any attempt to finalize creating a new request if we're marking
return archiver , commiter . Commit ( )
r . archiveComplete = true
}
}
// ArchiveRepository satisfies the ArchiveRequest being passed in. Processing
// ArchiveRepository satisfies the ArchiveRequest being passed in. Processing
@ -255,65 +204,46 @@ func doArchive(r *ArchiveRequest) {
// anything. In all cases, the caller should be examining the *ArchiveRequest
// anything. In all cases, the caller should be examining the *ArchiveRequest
// being returned for completion, as it may be different than the one they passed
// being returned for completion, as it may be different than the one they passed
// in.
// in.
func ArchiveRepository ( request * ArchiveRequest ) * ArchiveRequest {
func ArchiveRepository ( request * ArchiveRequest ) ( * models . RepoArchiver , error ) {
// We'll return the request that's already been enqueued if it has been
return doArchive ( request )
// enqueued, or we'll immediately enqueue it if it has not been enqueued
}
// and it is not marked complete.
archiveMutex . Lock ( )
var archiverQueue queue . UniqueQueue
defer archiveMutex . Unlock ( )
if rExisting := getArchiveRequest ( request . repo , request . commit , request . archiveType ) ; rExisting != nil {
// Init initlize archive
return rExisting
func Init ( ) error {
}
handler := func ( data ... queue . Data ) {
if request . archiveComplete {
for _ , datum := range data {
return request
archiveReq , ok := datum . ( * ArchiveRequest )
}
if ! ok {
log . Error ( "Unable to process provided datum: %v - not possible to cast to IndexerData" , datum )
request . cchan = make ( chan struct { } )
continue
archiveInProgress = append ( archiveInProgress , request )
}
go func ( ) {
log . Trace ( "ArchiverData Process: %#v" , archiveReq )
// Wait to start, if we have the Cond for it. This is currently only
if _ , err := doArchive ( archiveReq ) ; err != nil {
// useful for testing, so that the start and release of queued entries
log . Error ( "Archive %v faild: %v" , datum , err )
// can be controlled to examine the queue.
}
if archiveQueueStartCond != nil {
}
archiveQueueMutex . Lock ( )
}
archiveQueueStartCond . Wait ( )
archiveQueueMutex . Unlock ( )
archiverQueue = queue . CreateUniqueQueue ( "repo-archive" , handler , new ( ArchiveRequest ) )
}
if archiverQueue == nil {
return errors . New ( "unable to create codes indexer queue" )
// Drop the mutex while we process the request. This may take a long
}
// time, and it's not necessary now that we've added the reequest to
// archiveInProgress.
doArchive ( request )
if archiveQueueReleaseCond != nil {
archiveQueueMutex . Lock ( )
archiveQueueReleaseCond . Wait ( )
archiveQueueMutex . Unlock ( )
}
// Purge this request from the list. To do so, we'll just take the
// index at which we ended up at and swap the final element into that
// position, then chop off the now-redundant final element. The slice
// may have change in between these two segments and we may have moved,
// so we search for it here. We could perhaps avoid this search
// entirely if len(archiveInProgress) == 1, but we should verify
// correctness.
archiveMutex . Lock ( )
defer archiveMutex . Unlock ( )
idx := - 1
for _idx , req := range archiveInProgress {
if req == request {
idx = _idx
break
}
}
if idx == - 1 {
log . Error ( "ArchiveRepository: Failed to find request for removal." )
return
}
archiveInProgress = append ( archiveInProgress [ : idx ] , archiveInProgress [ idx + 1 : ] ... )
} ( )
return request
go graceful . GetManager ( ) . RunWithShutdownFns ( archiverQueue . Run )
return nil
}
// StartArchive push the archive request to the queue
func StartArchive ( request * ArchiveRequest ) error {
has , err := archiverQueue . Has ( request )
if err != nil {
return err
}
if has {
return nil
}
return archiverQueue . Push ( request )
}
}