gitea source for verification 2026-05-22
Some checks are pending
release-nightly / nightly-binary (push) Waiting to run
release-nightly / nightly-docker-rootful (push) Waiting to run
release-nightly / nightly-docker-rootless (push) Waiting to run

This commit is contained in:
2026-05-22 16:44:59 +08:00
commit 7a61cd3abc
5650 changed files with 690128 additions and 0 deletions

213
models/actions/artifact.go Normal file
View File

@@ -0,0 +1,213 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
// This artifact server is inspired by https://github.com/nektos/act/blob/master/pkg/artifacts/server.go.
// It updates url setting and uses ObjectStore to handle artifacts persistence.
package actions
import (
"context"
"errors"
"time"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
"xorm.io/builder"
)
// ArtifactStatus is the status of an artifact, uploading, expired or need-delete
type ArtifactStatus int64
const (
ArtifactStatusUploadPending ArtifactStatus = iota + 1 // 1 ArtifactStatusUploadPending is the status of an artifact upload that is pending
ArtifactStatusUploadConfirmed // 2 ArtifactStatusUploadConfirmed is the status of an artifact upload that is confirmed
ArtifactStatusUploadError // 3 ArtifactStatusUploadError is the status of an artifact upload that is errored
ArtifactStatusExpired // 4, ArtifactStatusExpired is the status of an artifact that is expired
ArtifactStatusPendingDeletion // 5, ArtifactStatusPendingDeletion is the status of an artifact that is pending deletion
ArtifactStatusDeleted // 6, ArtifactStatusDeleted is the status of an artifact that is deleted
)
func (status ArtifactStatus) ToString() string {
switch status {
case ArtifactStatusUploadPending:
return "upload is not yet completed"
case ArtifactStatusUploadConfirmed:
return "upload is completed"
case ArtifactStatusUploadError:
return "upload failed"
case ArtifactStatusExpired:
return "expired"
case ArtifactStatusPendingDeletion:
return "pending deletion"
case ArtifactStatusDeleted:
return "deleted"
default:
return "unknown"
}
}
func init() {
db.RegisterModel(new(ActionArtifact))
}
// ActionArtifact is a file that is stored in the artifact storage.
type ActionArtifact struct {
ID int64 `xorm:"pk autoincr"`
RunID int64 `xorm:"index unique(runid_name_path)"` // The run id of the artifact
RunnerID int64
RepoID int64 `xorm:"index"`
OwnerID int64
CommitSHA string
StoragePath string // The path to the artifact in the storage
FileSize int64 // The size of the artifact in bytes
FileCompressedSize int64 // The size of the artifact in bytes after gzip compression
ContentEncoding string // The content encoding of the artifact
ArtifactPath string `xorm:"index unique(runid_name_path)"` // The path to the artifact when runner uploads it
ArtifactName string `xorm:"index unique(runid_name_path)"` // The name of the artifact when runner uploads it
Status ArtifactStatus `xorm:"index"` // The status of the artifact, uploading, expired or need-delete
CreatedUnix timeutil.TimeStamp `xorm:"created"`
UpdatedUnix timeutil.TimeStamp `xorm:"updated index"`
ExpiredUnix timeutil.TimeStamp `xorm:"index"` // The time when the artifact will be expired
}
func CreateArtifact(ctx context.Context, t *ActionTask, artifactName, artifactPath string, expiredDays int64) (*ActionArtifact, error) {
if err := t.LoadJob(ctx); err != nil {
return nil, err
}
artifact, err := getArtifactByNameAndPath(ctx, t.Job.RunID, artifactName, artifactPath)
if errors.Is(err, util.ErrNotExist) {
artifact := &ActionArtifact{
ArtifactName: artifactName,
ArtifactPath: artifactPath,
RunID: t.Job.RunID,
RunnerID: t.RunnerID,
RepoID: t.RepoID,
OwnerID: t.OwnerID,
CommitSHA: t.CommitSHA,
Status: ArtifactStatusUploadPending,
ExpiredUnix: timeutil.TimeStamp(time.Now().Unix() + timeutil.Day*expiredDays),
}
if _, err := db.GetEngine(ctx).Insert(artifact); err != nil {
return nil, err
}
return artifact, nil
} else if err != nil {
return nil, err
}
if _, err := db.GetEngine(ctx).ID(artifact.ID).Cols("expired_unix").Update(&ActionArtifact{
ExpiredUnix: timeutil.TimeStamp(time.Now().Unix() + timeutil.Day*expiredDays),
}); err != nil {
return nil, err
}
return artifact, nil
}
func getArtifactByNameAndPath(ctx context.Context, runID int64, name, fpath string) (*ActionArtifact, error) {
var art ActionArtifact
has, err := db.GetEngine(ctx).Where("run_id = ? AND artifact_name = ? AND artifact_path = ?", runID, name, fpath).Get(&art)
if err != nil {
return nil, err
} else if !has {
return nil, util.ErrNotExist
}
return &art, nil
}
// UpdateArtifactByID updates an artifact by id
func UpdateArtifactByID(ctx context.Context, id int64, art *ActionArtifact) error {
art.ID = id
_, err := db.GetEngine(ctx).ID(id).AllCols().Update(art)
return err
}
type FindArtifactsOptions struct {
db.ListOptions
RepoID int64
RunID int64
ArtifactName string
Status int
FinalizedArtifactsV4 bool
}
func (opts FindArtifactsOptions) ToOrders() string {
return "id"
}
var _ db.FindOptionsOrder = (*FindArtifactsOptions)(nil)
func (opts FindArtifactsOptions) ToConds() builder.Cond {
cond := builder.NewCond()
if opts.RepoID > 0 {
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
}
if opts.RunID > 0 {
cond = cond.And(builder.Eq{"run_id": opts.RunID})
}
if opts.ArtifactName != "" {
cond = cond.And(builder.Eq{"artifact_name": opts.ArtifactName})
}
if opts.Status > 0 {
cond = cond.And(builder.Eq{"status": opts.Status})
}
if opts.FinalizedArtifactsV4 {
cond = cond.And(builder.Eq{"status": ArtifactStatusUploadConfirmed}.Or(builder.Eq{"status": ArtifactStatusExpired}))
cond = cond.And(builder.Eq{"content_encoding": "application/zip"})
}
return cond
}
// ActionArtifactMeta is the meta-data of an artifact
type ActionArtifactMeta struct {
ArtifactName string
FileSize int64
Status ArtifactStatus
}
// ListUploadedArtifactsMeta returns all uploaded artifacts meta of a run
func ListUploadedArtifactsMeta(ctx context.Context, runID int64) ([]*ActionArtifactMeta, error) {
arts := make([]*ActionArtifactMeta, 0, 10)
return arts, db.GetEngine(ctx).Table("action_artifact").
Where("run_id=? AND (status=? OR status=?)", runID, ArtifactStatusUploadConfirmed, ArtifactStatusExpired).
GroupBy("artifact_name").
Select("artifact_name, sum(file_size) as file_size, max(status) as status").
Find(&arts)
}
// ListNeedExpiredArtifacts returns all need expired artifacts but not deleted
func ListNeedExpiredArtifacts(ctx context.Context) ([]*ActionArtifact, error) {
arts := make([]*ActionArtifact, 0, 10)
return arts, db.GetEngine(ctx).
Where("expired_unix < ? AND status = ?", timeutil.TimeStamp(time.Now().Unix()), ArtifactStatusUploadConfirmed).Find(&arts)
}
// ListPendingDeleteArtifacts returns all artifacts in pending-delete status.
// limit is the max number of artifacts to return.
func ListPendingDeleteArtifacts(ctx context.Context, limit int) ([]*ActionArtifact, error) {
arts := make([]*ActionArtifact, 0, limit)
return arts, db.GetEngine(ctx).
Where("status = ?", ArtifactStatusPendingDeletion).Limit(limit).Find(&arts)
}
// SetArtifactExpired sets an artifact to expired
func SetArtifactExpired(ctx context.Context, artifactID int64) error {
_, err := db.GetEngine(ctx).Where("id=? AND status = ?", artifactID, ArtifactStatusUploadConfirmed).Cols("status").Update(&ActionArtifact{Status: ArtifactStatusExpired})
return err
}
// SetArtifactNeedDelete sets an artifact to need-delete, cron job will delete it
func SetArtifactNeedDelete(ctx context.Context, runID int64, name string) error {
_, err := db.GetEngine(ctx).Where("run_id=? AND artifact_name=? AND status = ?", runID, name, ArtifactStatusUploadConfirmed).Cols("status").Update(&ActionArtifact{Status: ArtifactStatusPendingDeletion})
return err
}
// SetArtifactDeleted sets an artifact to deleted
func SetArtifactDeleted(ctx context.Context, artifactID int64) error {
_, err := db.GetEngine(ctx).ID(artifactID).Cols("status").Update(&ActionArtifact{Status: ArtifactStatusDeleted})
return err
}

View File

@@ -0,0 +1,20 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"testing"
"code.gitea.io/gitea/models/unittest"
)
func TestMain(m *testing.M) {
unittest.MainTest(m, &unittest.TestOptions{
FixtureFiles: []string{
"action_runner_token.yml",
"action_run.yml",
"repository.yml",
},
})
}

444
models/actions/run.go Normal file
View File

@@ -0,0 +1,444 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"errors"
"fmt"
"slices"
"strings"
"time"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/setting"
api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
webhook_module "code.gitea.io/gitea/modules/webhook"
"github.com/nektos/act/pkg/jobparser"
"xorm.io/builder"
)
// ActionRun represents a run of a workflow file
type ActionRun struct {
ID int64
Title string
RepoID int64 `xorm:"index unique(repo_index)"`
Repo *repo_model.Repository `xorm:"-"`
OwnerID int64 `xorm:"index"`
WorkflowID string `xorm:"index"` // the name of workflow file
Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository
TriggerUserID int64 `xorm:"index"`
TriggerUser *user_model.User `xorm:"-"`
ScheduleID int64
Ref string `xorm:"index"` // the commit/tag/… that caused the run
IsRefDeleted bool `xorm:"-"`
CommitSHA string
IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow.
NeedApproval bool // may need approval if it's a fork pull request
ApprovedBy int64 `xorm:"index"` // who approved
Event webhook_module.HookEventType // the webhook event that causes the workflow to run
EventPayload string `xorm:"LONGTEXT"`
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
Status Status `xorm:"index"`
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
// PreviousDuration is used for recording previous duration
PreviousDuration time.Duration
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}
func init() {
db.RegisterModel(new(ActionRun))
db.RegisterModel(new(ActionRunIndex))
}
func (run *ActionRun) HTMLURL() string {
if run.Repo == nil {
return ""
}
return fmt.Sprintf("%s/actions/runs/%d", run.Repo.HTMLURL(), run.Index)
}
func (run *ActionRun) Link() string {
if run.Repo == nil {
return ""
}
return fmt.Sprintf("%s/actions/runs/%d", run.Repo.Link(), run.Index)
}
func (run *ActionRun) WorkflowLink() string {
if run.Repo == nil {
return ""
}
return fmt.Sprintf("%s/actions/?workflow=%s", run.Repo.Link(), run.WorkflowID)
}
// RefLink return the url of run's ref
func (run *ActionRun) RefLink() string {
refName := git.RefName(run.Ref)
if refName.IsPull() {
return run.Repo.Link() + "/pulls/" + refName.ShortName()
}
return run.Repo.Link() + "/src/" + refName.RefWebLinkPath()
}
// PrettyRef return #id for pull ref or ShortName for others
func (run *ActionRun) PrettyRef() string {
refName := git.RefName(run.Ref)
if refName.IsPull() {
return "#" + strings.TrimSuffix(strings.TrimPrefix(run.Ref, git.PullPrefix), "/head")
}
return refName.ShortName()
}
// LoadAttributes load Repo TriggerUser if not loaded
func (run *ActionRun) LoadAttributes(ctx context.Context) error {
if run == nil {
return nil
}
if err := run.LoadRepo(ctx); err != nil {
return err
}
if err := run.Repo.LoadAttributes(ctx); err != nil {
return err
}
if run.TriggerUser == nil {
u, err := user_model.GetPossibleUserByID(ctx, run.TriggerUserID)
if err != nil {
return err
}
run.TriggerUser = u
}
return nil
}
func (run *ActionRun) LoadRepo(ctx context.Context) error {
if run == nil || run.Repo != nil {
return nil
}
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
if err != nil {
return err
}
run.Repo = repo
return nil
}
func (run *ActionRun) Duration() time.Duration {
return calculateDuration(run.Started, run.Stopped, run.Status) + run.PreviousDuration
}
func (run *ActionRun) GetPushEventPayload() (*api.PushPayload, error) {
if run.Event == webhook_module.HookEventPush {
var payload api.PushPayload
if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil {
return nil, err
}
return &payload, nil
}
return nil, fmt.Errorf("event %s is not a push event", run.Event)
}
func (run *ActionRun) GetPullRequestEventPayload() (*api.PullRequestPayload, error) {
if run.Event.IsPullRequest() {
var payload api.PullRequestPayload
if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil {
return nil, err
}
return &payload, nil
}
return nil, fmt.Errorf("event %s is not a pull request event", run.Event)
}
func (run *ActionRun) GetWorkflowRunEventPayload() (*api.WorkflowRunPayload, error) {
if run.Event == webhook_module.HookEventWorkflowRun {
var payload api.WorkflowRunPayload
if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil {
return nil, err
}
return &payload, nil
}
return nil, fmt.Errorf("event %s is not a workflow run event", run.Event)
}
func (run *ActionRun) IsSchedule() bool {
return run.ScheduleID > 0
}
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
_, err := db.GetEngine(ctx).ID(repo.ID).
NoAutoTime().
Cols("num_action_runs", "num_closed_action_runs").
SetExpr("num_action_runs",
builder.Select("count(*)").From("action_run").
Where(builder.Eq{"repo_id": repo.ID}),
).
SetExpr("num_closed_action_runs",
builder.Select("count(*)").From("action_run").
Where(builder.Eq{
"repo_id": repo.ID,
}.And(
builder.In("status",
StatusSuccess,
StatusFailure,
StatusCancelled,
StatusSkipped,
),
),
),
).
Update(repo)
return err
}
// CancelPreviousJobs cancels all previous jobs of the same repository, reference, workflow, and event.
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) ([]*ActionRunJob, error) {
// Find all runs in the specified repository, reference, and workflow with non-final status
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
RepoID: repoID,
Ref: ref,
WorkflowID: workflowID,
TriggerEvent: event,
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
})
if err != nil {
return nil, err
}
// If there are no runs found, there's no need to proceed with cancellation, so return nil.
if total == 0 {
return nil, nil
}
cancelledJobs := make([]*ActionRunJob, 0, total)
// Iterate over each found run and cancel its associated jobs.
for _, run := range runs {
// Find all jobs associated with the current run.
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return cancelledJobs, err
}
// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()
// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return cancelledJobs, err
}
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return cancelledJobs, errors.New("job has changed, try again")
}
cancelledJobs = append(cancelledJobs, job)
// Continue with the next job.
continue
}
// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return cancelledJobs, err
}
cancelledJobs = append(cancelledJobs, job)
}
}
// Return nil to indicate successful cancellation of all running and waiting jobs.
return cancelledJobs, nil
}
// InsertRun inserts a run
// The title will be cut off at 255 characters if it's longer than 255 characters.
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
return db.WithTx(ctx, func(ctx context.Context) error {
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
if err != nil {
return err
}
run.Index = index
run.Title = util.EllipsisDisplayString(run.Title, 255)
if err := db.Insert(ctx, run); err != nil {
return err
}
if run.Repo == nil {
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
if err != nil {
return err
}
run.Repo = repo
}
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}
runJobs := make([]*ActionRunJob, 0, len(jobs))
var hasWaiting bool
for _, v := range jobs {
id, job := v.Job()
needs := job.Needs()
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
return err
}
payload, _ := v.Marshal()
status := StatusWaiting
if len(needs) > 0 || run.NeedApproval {
status = StatusBlocked
} else {
hasWaiting = true
}
job.Name = util.EllipsisDisplayString(job.Name, 255)
runJobs = append(runJobs, &ActionRunJob{
RunID: run.ID,
RepoID: run.RepoID,
OwnerID: run.OwnerID,
CommitSHA: run.CommitSHA,
IsForkPullRequest: run.IsForkPullRequest,
Name: job.Name,
WorkflowPayload: payload,
JobID: id,
Needs: needs,
RunsOn: job.RunsOn(),
Status: status,
})
}
if err := db.Insert(ctx, runJobs); err != nil {
return err
}
// if there is a job in the waiting status, increase tasks version.
if hasWaiting {
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
return err
}
}
return nil
})
}
func GetRunByRepoAndID(ctx context.Context, repoID, runID int64) (*ActionRun, error) {
var run ActionRun
has, err := db.GetEngine(ctx).Where("id=? AND repo_id=?", runID, repoID).Get(&run)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("run with id %d: %w", runID, util.ErrNotExist)
}
return &run, nil
}
func GetRunByIndex(ctx context.Context, repoID, index int64) (*ActionRun, error) {
run := &ActionRun{
RepoID: repoID,
Index: index,
}
has, err := db.GetEngine(ctx).Get(run)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("run with index %d %d: %w", repoID, index, util.ErrNotExist)
}
return run, nil
}
func GetLatestRun(ctx context.Context, repoID int64) (*ActionRun, error) {
run := &ActionRun{
RepoID: repoID,
}
has, err := db.GetEngine(ctx).Where("repo_id=?", repoID).Desc("index").Get(run)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("latest run with repo_id %d: %w", repoID, util.ErrNotExist)
}
return run, nil
}
func GetWorkflowLatestRun(ctx context.Context, repoID int64, workflowFile, branch, event string) (*ActionRun, error) {
var run ActionRun
q := db.GetEngine(ctx).Where("repo_id=?", repoID).
And("ref = ?", branch).
And("workflow_id = ?", workflowFile)
if event != "" {
q.And("event = ?", event)
}
has, err := q.Desc("id").Get(&run)
if err != nil {
return nil, err
} else if !has {
return nil, util.NewNotExistErrorf("run with repo_id %d, ref %s, workflow_id %s", repoID, branch, workflowFile)
}
return &run, nil
}
// UpdateRun updates a run.
// It requires the inputted run has Version set.
// It will return error if the version is not matched (it means the run has been changed after loaded).
func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
sess := db.GetEngine(ctx).ID(run.ID)
if len(cols) > 0 {
sess.Cols(cols...)
}
run.Title = util.EllipsisDisplayString(run.Title, 255)
affected, err := sess.Update(run)
if err != nil {
return err
}
if affected == 0 {
return errors.New("run has changed")
// It's impossible that the run is not found, since Gitea never deletes runs.
}
if run.Status != 0 || slices.Contains(cols, "status") {
if run.RepoID == 0 {
setting.PanicInDevOrTesting("RepoID should not be 0")
}
if err = run.LoadRepo(ctx); err != nil {
return err
}
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}
}
return nil
}
type ActionRunIndex db.ResourceIndex

199
models/actions/run_job.go Normal file
View File

@@ -0,0 +1,199 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"fmt"
"slices"
"time"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
"xorm.io/builder"
)
// ActionRunJob represents a job of a run
type ActionRunJob struct {
ID int64
RunID int64 `xorm:"index"`
Run *ActionRun `xorm:"-"`
RepoID int64 `xorm:"index"`
Repo *repo_model.Repository `xorm:"-"`
OwnerID int64 `xorm:"index"`
CommitSHA string `xorm:"index"`
IsForkPullRequest bool
Name string `xorm:"VARCHAR(255)"`
Attempt int64
WorkflowPayload []byte
JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id
Needs []string `xorm:"JSON TEXT"`
RunsOn []string `xorm:"JSON TEXT"`
TaskID int64 // the latest task of the job
Status Status `xorm:"index"`
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated index"`
}
func init() {
db.RegisterModel(new(ActionRunJob))
}
func (job *ActionRunJob) Duration() time.Duration {
return calculateDuration(job.Started, job.Stopped, job.Status)
}
func (job *ActionRunJob) LoadRun(ctx context.Context) error {
if job.Run == nil {
run, err := GetRunByRepoAndID(ctx, job.RepoID, job.RunID)
if err != nil {
return err
}
job.Run = run
}
return nil
}
func (job *ActionRunJob) LoadRepo(ctx context.Context) error {
if job.Repo == nil {
repo, err := repo_model.GetRepositoryByID(ctx, job.RepoID)
if err != nil {
return err
}
job.Repo = repo
}
return nil
}
// LoadAttributes load Run if not loaded
func (job *ActionRunJob) LoadAttributes(ctx context.Context) error {
if job == nil {
return nil
}
if err := job.LoadRun(ctx); err != nil {
return err
}
return job.Run.LoadAttributes(ctx)
}
func GetRunJobByID(ctx context.Context, id int64) (*ActionRunJob, error) {
var job ActionRunJob
has, err := db.GetEngine(ctx).Where("id=?", id).Get(&job)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("run job with id %d: %w", id, util.ErrNotExist)
}
return &job, nil
}
func GetRunJobsByRunID(ctx context.Context, runID int64) (ActionJobList, error) {
var jobs []*ActionRunJob
if err := db.GetEngine(ctx).Where("run_id=?", runID).OrderBy("id").Find(&jobs); err != nil {
return nil, err
}
return jobs, nil
}
func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) {
e := db.GetEngine(ctx)
sess := e.ID(job.ID)
if len(cols) > 0 {
sess.Cols(cols...)
}
if cond != nil {
sess.Where(cond)
}
affected, err := sess.Update(job)
if err != nil {
return 0, err
}
if affected == 0 || (!slices.Contains(cols, "status") && job.Status == 0) {
return affected, nil
}
if affected != 0 && slices.Contains(cols, "status") && job.Status.IsWaiting() {
// if the status of job changes to waiting again, increase tasks version.
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
return 0, err
}
}
if job.RunID == 0 {
var err error
if job, err = GetRunJobByID(ctx, job.ID); err != nil {
return 0, err
}
}
{
// Other goroutines may aggregate the status of the run and update it too.
// So we need load the run and its jobs before updating the run.
run, err := GetRunByRepoAndID(ctx, job.RepoID, job.RunID)
if err != nil {
return 0, err
}
jobs, err := GetRunJobsByRunID(ctx, job.RunID)
if err != nil {
return 0, err
}
run.Status = AggregateJobStatus(jobs)
if run.Started.IsZero() && run.Status.IsRunning() {
run.Started = timeutil.TimeStampNow()
}
if run.Stopped.IsZero() && run.Status.IsDone() {
run.Stopped = timeutil.TimeStampNow()
}
if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil {
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
}
}
return affected, nil
}
func AggregateJobStatus(jobs []*ActionRunJob) Status {
allSuccessOrSkipped := len(jobs) != 0
allSkipped := len(jobs) != 0
var hasFailure, hasCancelled, hasWaiting, hasRunning, hasBlocked bool
for _, job := range jobs {
allSuccessOrSkipped = allSuccessOrSkipped && (job.Status == StatusSuccess || job.Status == StatusSkipped)
allSkipped = allSkipped && job.Status == StatusSkipped
hasFailure = hasFailure || job.Status == StatusFailure
hasCancelled = hasCancelled || job.Status == StatusCancelled
hasWaiting = hasWaiting || job.Status == StatusWaiting
hasRunning = hasRunning || job.Status == StatusRunning
hasBlocked = hasBlocked || job.Status == StatusBlocked
}
switch {
case allSkipped:
return StatusSkipped
case allSuccessOrSkipped:
return StatusSuccess
case hasCancelled:
return StatusCancelled
case hasRunning:
return StatusRunning
case hasWaiting:
return StatusWaiting
case hasFailure:
return StatusFailure
case hasBlocked:
return StatusBlocked
default:
return StatusUnknown // it shouldn't happen
}
}

View File

@@ -0,0 +1,110 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/timeutil"
"xorm.io/builder"
)
type ActionJobList []*ActionRunJob
func (jobs ActionJobList) GetRunIDs() []int64 {
return container.FilterSlice(jobs, func(j *ActionRunJob) (int64, bool) {
return j.RunID, j.RunID != 0
})
}
func (jobs ActionJobList) LoadRepos(ctx context.Context) error {
repoIDs := container.FilterSlice(jobs, func(j *ActionRunJob) (int64, bool) {
return j.RepoID, j.RepoID != 0 && j.Repo == nil
})
if len(repoIDs) == 0 {
return nil
}
repos := make(map[int64]*repo_model.Repository, len(repoIDs))
if err := db.GetEngine(ctx).In("id", repoIDs).Find(&repos); err != nil {
return err
}
for _, j := range jobs {
if j.RepoID > 0 && j.Repo == nil {
j.Repo = repos[j.RepoID]
}
}
return nil
}
func (jobs ActionJobList) LoadRuns(ctx context.Context, withRepo bool) error {
if withRepo {
if err := jobs.LoadRepos(ctx); err != nil {
return err
}
}
runIDs := jobs.GetRunIDs()
runs := make(map[int64]*ActionRun, len(runIDs))
if err := db.GetEngine(ctx).In("id", runIDs).Find(&runs); err != nil {
return err
}
for _, j := range jobs {
if j.RunID > 0 && j.Run == nil {
j.Run = runs[j.RunID]
j.Run.Repo = j.Repo
}
}
return nil
}
func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) error {
return jobs.LoadRuns(ctx, withRepo)
}
type FindRunJobOptions struct {
db.ListOptions
RunID int64
RepoID int64
OwnerID int64
CommitSHA string
Statuses []Status
UpdatedBefore timeutil.TimeStamp
}
func (opts FindRunJobOptions) ToConds() builder.Cond {
cond := builder.NewCond()
if opts.RunID > 0 {
cond = cond.And(builder.Eq{"`action_run_job`.run_id": opts.RunID})
}
if opts.RepoID > 0 {
cond = cond.And(builder.Eq{"`action_run_job`.repo_id": opts.RepoID})
}
if opts.CommitSHA != "" {
cond = cond.And(builder.Eq{"`action_run_job`.commit_sha": opts.CommitSHA})
}
if len(opts.Statuses) > 0 {
cond = cond.And(builder.In("`action_run_job`.status", opts.Statuses))
}
if opts.UpdatedBefore > 0 {
cond = cond.And(builder.Lt{"`action_run_job`.updated": opts.UpdatedBefore})
}
return cond
}
func (opts FindRunJobOptions) ToJoins() []db.JoinFunc {
if opts.OwnerID > 0 {
return []db.JoinFunc{
func(sess db.Engine) error {
sess.Join("INNER", "repository", "repository.id = repo_id AND repository.owner_id = ?", opts.OwnerID)
return nil
},
}
}
return nil
}

View File

@@ -0,0 +1,85 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestAggregateJobStatus(t *testing.T) {
testStatuses := func(expected Status, statuses []Status) {
t.Helper()
var jobs []*ActionRunJob
for _, v := range statuses {
jobs = append(jobs, &ActionRunJob{Status: v})
}
actual := AggregateJobStatus(jobs)
if !assert.Equal(t, expected, actual) {
var statusStrings []string
for _, s := range statuses {
statusStrings = append(statusStrings, s.String())
}
t.Errorf("AggregateJobStatus(%v) = %v, want %v", statusStrings, statusNames[actual], statusNames[expected])
}
}
cases := []struct {
statuses []Status
expected Status
}{
// unknown cases, maybe it shouldn't happen in real world
{[]Status{}, StatusUnknown},
{[]Status{StatusUnknown, StatusSuccess}, StatusUnknown},
{[]Status{StatusUnknown, StatusSkipped}, StatusUnknown},
{[]Status{StatusUnknown, StatusFailure}, StatusFailure},
{[]Status{StatusUnknown, StatusCancelled}, StatusCancelled},
{[]Status{StatusUnknown, StatusWaiting}, StatusWaiting},
{[]Status{StatusUnknown, StatusRunning}, StatusRunning},
{[]Status{StatusUnknown, StatusBlocked}, StatusBlocked},
// success with other status
{[]Status{StatusSuccess}, StatusSuccess},
{[]Status{StatusSuccess, StatusSkipped}, StatusSuccess}, // skipped doesn't affect success
{[]Status{StatusSuccess, StatusFailure}, StatusFailure},
{[]Status{StatusSuccess, StatusCancelled}, StatusCancelled},
{[]Status{StatusSuccess, StatusWaiting}, StatusWaiting},
{[]Status{StatusSuccess, StatusRunning}, StatusRunning},
{[]Status{StatusSuccess, StatusBlocked}, StatusBlocked},
// any cancelled, then cancelled
{[]Status{StatusCancelled}, StatusCancelled},
{[]Status{StatusCancelled, StatusSuccess}, StatusCancelled},
{[]Status{StatusCancelled, StatusSkipped}, StatusCancelled},
{[]Status{StatusCancelled, StatusFailure}, StatusCancelled},
{[]Status{StatusCancelled, StatusWaiting}, StatusCancelled},
{[]Status{StatusCancelled, StatusRunning}, StatusCancelled},
{[]Status{StatusCancelled, StatusBlocked}, StatusCancelled},
// failure with other status, usually fail fast, but "running" wins to match GitHub's behavior
// another reason that we can't make "failure" wins over "running": it would cause a weird behavior that user cannot cancel a workflow or get current running workflows correctly by filter after a job fail.
{[]Status{StatusFailure}, StatusFailure},
{[]Status{StatusFailure, StatusSuccess}, StatusFailure},
{[]Status{StatusFailure, StatusSkipped}, StatusFailure},
{[]Status{StatusFailure, StatusCancelled}, StatusCancelled},
{[]Status{StatusFailure, StatusWaiting}, StatusWaiting},
{[]Status{StatusFailure, StatusRunning}, StatusRunning},
{[]Status{StatusFailure, StatusBlocked}, StatusFailure},
// skipped with other status
// "all skipped" is also considered as "mergeable" by "services/actions.toCommitStatus", the same as GitHub
{[]Status{StatusSkipped}, StatusSkipped},
{[]Status{StatusSkipped, StatusSuccess}, StatusSuccess},
{[]Status{StatusSkipped, StatusFailure}, StatusFailure},
{[]Status{StatusSkipped, StatusCancelled}, StatusCancelled},
{[]Status{StatusSkipped, StatusWaiting}, StatusWaiting},
{[]Status{StatusSkipped, StatusRunning}, StatusRunning},
{[]Status{StatusSkipped, StatusBlocked}, StatusBlocked},
}
for _, c := range cases {
testStatuses(c.expected, c.statuses)
}
}

150
models/actions/run_list.go Normal file
View File

@@ -0,0 +1,150 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/translation"
webhook_module "code.gitea.io/gitea/modules/webhook"
"xorm.io/builder"
)
type RunList []*ActionRun
// GetUserIDs returns a slice of user's id
func (runs RunList) GetUserIDs() []int64 {
return container.FilterSlice(runs, func(run *ActionRun) (int64, bool) {
return run.TriggerUserID, true
})
}
func (runs RunList) GetRepoIDs() []int64 {
return container.FilterSlice(runs, func(run *ActionRun) (int64, bool) {
return run.RepoID, true
})
}
func (runs RunList) LoadTriggerUser(ctx context.Context) error {
userIDs := runs.GetUserIDs()
users := make(map[int64]*user_model.User, len(userIDs))
if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil {
return err
}
for _, run := range runs {
if run.TriggerUserID == user_model.ActionsUserID {
run.TriggerUser = user_model.NewActionsUser()
} else {
run.TriggerUser = users[run.TriggerUserID]
if run.TriggerUser == nil {
run.TriggerUser = user_model.NewGhostUser()
}
}
}
return nil
}
func (runs RunList) LoadRepos(ctx context.Context) error {
repoIDs := runs.GetRepoIDs()
repos, err := repo_model.GetRepositoriesMapByIDs(ctx, repoIDs)
if err != nil {
return err
}
for _, run := range runs {
run.Repo = repos[run.RepoID]
}
return nil
}
type FindRunOptions struct {
db.ListOptions
RepoID int64
OwnerID int64
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
TriggerEvent webhook_module.HookEventType
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
CommitSHA string
}
func (opts FindRunOptions) ToConds() builder.Cond {
cond := builder.NewCond()
if opts.RepoID > 0 {
cond = cond.And(builder.Eq{"`action_run`.repo_id": opts.RepoID})
}
if opts.WorkflowID != "" {
cond = cond.And(builder.Eq{"`action_run`.workflow_id": opts.WorkflowID})
}
if opts.TriggerUserID > 0 {
cond = cond.And(builder.Eq{"`action_run`.trigger_user_id": opts.TriggerUserID})
}
if opts.Approved {
cond = cond.And(builder.Gt{"`action_run`.approved_by": 0})
}
if len(opts.Status) > 0 {
cond = cond.And(builder.In("`action_run`.status", opts.Status))
}
if opts.Ref != "" {
cond = cond.And(builder.Eq{"`action_run`.ref": opts.Ref})
}
if opts.TriggerEvent != "" {
cond = cond.And(builder.Eq{"`action_run`.trigger_event": opts.TriggerEvent})
}
if opts.CommitSHA != "" {
cond = cond.And(builder.Eq{"`action_run`.commit_sha": opts.CommitSHA})
}
return cond
}
func (opts FindRunOptions) ToJoins() []db.JoinFunc {
if opts.OwnerID > 0 {
return []db.JoinFunc{func(sess db.Engine) error {
sess.Join("INNER", "repository", "repository.id = repo_id AND repository.owner_id = ?", opts.OwnerID)
return nil
}}
}
return nil
}
func (opts FindRunOptions) ToOrders() string {
return "`action_run`.`id` DESC"
}
type StatusInfo struct {
Status int
DisplayedStatus string
}
// GetStatusInfoList returns a slice of StatusInfo
func GetStatusInfoList(ctx context.Context, lang translation.Locale) []StatusInfo {
// same as those in aggregateJobStatus
allStatus := []Status{StatusSuccess, StatusFailure, StatusWaiting, StatusRunning}
statusInfoList := make([]StatusInfo, 0, 4)
for _, s := range allStatus {
statusInfoList = append(statusInfoList, StatusInfo{
Status: int(s),
DisplayedStatus: s.LocaleString(lang),
})
}
return statusInfoList
}
// GetActors returns a slice of Actors
func GetActors(ctx context.Context, repoID int64) ([]*user_model.User, error) {
actors := make([]*user_model.User, 0, 10)
return actors, db.GetEngine(ctx).Where(builder.In("id", builder.Select("`action_run`.trigger_user_id").From("`action_run`").
GroupBy("`action_run`.trigger_user_id").
Where(builder.Eq{"`action_run`.repo_id": repoID}))).
Cols("id", "name", "full_name", "avatar", "avatar_email", "use_custom_avatar").
OrderBy(user_model.GetOrderByName()).
Find(&actors)
}

View File

@@ -0,0 +1,35 @@
// Copyright 2025 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"testing"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
func TestUpdateRepoRunsNumbers(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
// update the number to a wrong one, the original is 3
_, err := db.GetEngine(t.Context()).ID(4).Cols("num_closed_action_runs").Update(&repo_model.Repository{
NumClosedActionRuns: 2,
})
assert.NoError(t, err)
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 4})
assert.Equal(t, 4, repo.NumActionRuns)
assert.Equal(t, 2, repo.NumClosedActionRuns)
// now update will correct them, only num_actionr_runs and num_closed_action_runs should be updated
err = updateRepoRunsNumbers(t.Context(), repo)
assert.NoError(t, err)
repo = unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 4})
assert.Equal(t, 5, repo.NumActionRuns)
assert.Equal(t, 3, repo.NumClosedActionRuns)
}

388
models/actions/runner.go Normal file
View File

@@ -0,0 +1,388 @@
// Copyright 2021 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"errors"
"fmt"
"strings"
"time"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/shared/types"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/optional"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/translation"
"code.gitea.io/gitea/modules/util"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"xorm.io/builder"
)
// ActionRunner represents runner machines
//
// It can be:
// 1. global runner, OwnerID is 0 and RepoID is 0
// 2. org/user level runner, OwnerID is org/user ID and RepoID is 0
// 3. repo level runner, OwnerID is 0 and RepoID is repo ID
//
// Please note that it's not acceptable to have both OwnerID and RepoID to be non-zero,
// or it will be complicated to find runners belonging to a specific owner.
// For example, conditions like `OwnerID = 1` will also return runner {OwnerID: 1, RepoID: 1},
// but it's a repo level runner, not an org/user level runner.
// To avoid this, make it clear with {OwnerID: 0, RepoID: 1} for repo level runners.
type ActionRunner struct {
ID int64
UUID string `xorm:"CHAR(36) UNIQUE"`
Name string `xorm:"VARCHAR(255)"`
Version string `xorm:"VARCHAR(64)"`
OwnerID int64 `xorm:"index"`
Owner *user_model.User `xorm:"-"`
RepoID int64 `xorm:"index"`
Repo *repo_model.Repository `xorm:"-"`
Description string `xorm:"TEXT"`
Base int // 0 native 1 docker 2 virtual machine
RepoRange string // glob match which repositories could use this runner
Token string `xorm:"-"`
TokenHash string `xorm:"UNIQUE"` // sha256 of token
TokenSalt string
// TokenLastEight string `xorm:"token_last_eight"` // it's unnecessary because we don't find runners by token
LastOnline timeutil.TimeStamp `xorm:"index"`
LastActive timeutil.TimeStamp `xorm:"index"`
// Store labels defined in state file (default: .runner file) of `act_runner`
AgentLabels []string `xorm:"TEXT"`
// Store if this is a runner that only ever get one single job assigned
Ephemeral bool `xorm:"ephemeral NOT NULL DEFAULT false"`
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
Deleted timeutil.TimeStamp `xorm:"deleted"`
}
const (
RunnerOfflineTime = time.Minute
RunnerIdleTime = 10 * time.Second
)
// BelongsToOwnerName before calling, should guarantee that all attributes are loaded
func (r *ActionRunner) BelongsToOwnerName() string {
if r.RepoID != 0 {
return r.Repo.FullName()
}
if r.OwnerID != 0 {
return r.Owner.Name
}
return ""
}
func (r *ActionRunner) BelongsToOwnerType() types.OwnerType {
if r.RepoID != 0 {
return types.OwnerTypeRepository
}
if r.OwnerID != 0 {
switch r.Owner.Type {
case user_model.UserTypeOrganization:
return types.OwnerTypeOrganization
case user_model.UserTypeIndividual:
return types.OwnerTypeIndividual
}
}
return types.OwnerTypeSystemGlobal
}
// if the logic here changed, you should also modify FindRunnerOptions.ToCond
func (r *ActionRunner) Status() runnerv1.RunnerStatus {
if time.Since(r.LastOnline.AsTime()) > RunnerOfflineTime {
return runnerv1.RunnerStatus_RUNNER_STATUS_OFFLINE
}
if time.Since(r.LastActive.AsTime()) > RunnerIdleTime {
return runnerv1.RunnerStatus_RUNNER_STATUS_IDLE
}
return runnerv1.RunnerStatus_RUNNER_STATUS_ACTIVE
}
func (r *ActionRunner) StatusName() string {
return strings.ToLower(strings.TrimPrefix(r.Status().String(), "RUNNER_STATUS_"))
}
func (r *ActionRunner) StatusLocaleName(lang translation.Locale) string {
return lang.TrString("actions.runners.status." + r.StatusName())
}
func (r *ActionRunner) IsOnline() bool {
status := r.Status()
if status == runnerv1.RunnerStatus_RUNNER_STATUS_IDLE || status == runnerv1.RunnerStatus_RUNNER_STATUS_ACTIVE {
return true
}
return false
}
// EditableInContext checks if the runner is editable by the "context" owner/repo
// ownerID == 0 and repoID == 0 means "admin" context, any runner including global runners could be edited
// ownerID == 0 and repoID != 0 means "repo" context, any runner belonging to the given repo could be edited
// ownerID != 0 and repoID == 0 means "owner(org/user)" context, any runner belonging to the given user/org could be edited
// ownerID != 0 and repoID != 0 means "owner" OR "repo" context, legacy behavior, but we should forbid using it
func (r *ActionRunner) EditableInContext(ownerID, repoID int64) bool {
if ownerID != 0 && repoID != 0 {
setting.PanicInDevOrTesting("ownerID and repoID should not be both set")
}
if ownerID == 0 && repoID == 0 {
return true
}
if ownerID > 0 && r.OwnerID == ownerID {
return true
}
return repoID > 0 && r.RepoID == repoID
}
// LoadAttributes loads the attributes of the runner
func (r *ActionRunner) LoadAttributes(ctx context.Context) error {
if r.OwnerID > 0 {
var user user_model.User
has, err := db.GetEngine(ctx).ID(r.OwnerID).Get(&user)
if err != nil {
return err
}
if has {
r.Owner = &user
}
}
if r.RepoID > 0 {
var repo repo_model.Repository
has, err := db.GetEngine(ctx).ID(r.RepoID).Get(&repo)
if err != nil {
return err
}
if has {
r.Repo = &repo
}
}
return nil
}
func (r *ActionRunner) GenerateToken() (err error) {
r.Token, r.TokenSalt, r.TokenHash, _, err = generateSaltedToken()
return err
}
func init() {
db.RegisterModel(&ActionRunner{})
}
// FindRunnerOptions
// ownerID == 0 and repoID == 0 means any runner including global runners
// repoID != 0 and WithAvailable == false means any runner for the given repo
// repoID != 0 and WithAvailable == true means any runner for the given repo, parent user/org, and global runners
// ownerID != 0 and repoID == 0 and WithAvailable == false means any runner for the given user/org
// ownerID != 0 and repoID == 0 and WithAvailable == true means any runner for the given user/org and global runners
type FindRunnerOptions struct {
db.ListOptions
IDs []int64
RepoID int64
OwnerID int64 // it will be ignored if RepoID is set
Sort string
Filter string
IsOnline optional.Option[bool]
WithAvailable bool // not only runners belong to, but also runners can be used
}
func (opts FindRunnerOptions) ToConds() builder.Cond {
cond := builder.NewCond()
if len(opts.IDs) > 0 {
if len(opts.IDs) == 1 {
cond = cond.And(builder.Eq{"id": opts.IDs[0]})
} else {
cond = cond.And(builder.In("id", opts.IDs))
}
}
if opts.RepoID > 0 {
c := builder.NewCond().And(builder.Eq{"repo_id": opts.RepoID})
if opts.WithAvailable {
c = c.Or(builder.Eq{"owner_id": builder.Select("owner_id").From("repository").Where(builder.Eq{"id": opts.RepoID})})
c = c.Or(builder.Eq{"repo_id": 0, "owner_id": 0})
}
cond = cond.And(c)
} else if opts.OwnerID > 0 { // OwnerID is ignored if RepoID is set
c := builder.NewCond().And(builder.Eq{"owner_id": opts.OwnerID})
if opts.WithAvailable {
c = c.Or(builder.Eq{"repo_id": 0, "owner_id": 0})
}
cond = cond.And(c)
}
if opts.Filter != "" {
cond = cond.And(builder.Like{"name", opts.Filter})
}
if opts.IsOnline.Has() {
if opts.IsOnline.Value() {
cond = cond.And(builder.Gt{"last_online": time.Now().Add(-RunnerOfflineTime).Unix()})
} else {
cond = cond.And(builder.Lte{"last_online": time.Now().Add(-RunnerOfflineTime).Unix()})
}
}
return cond
}
func (opts FindRunnerOptions) ToOrders() string {
switch opts.Sort {
case "online":
return "last_online DESC"
case "offline":
return "last_online ASC"
case "alphabetically":
return "name ASC"
case "reversealphabetically":
return "name DESC"
case "newest":
return "id DESC"
case "oldest":
return "id ASC"
}
return "last_online DESC"
}
// GetRunnerByUUID returns a runner via uuid
func GetRunnerByUUID(ctx context.Context, uuid string) (*ActionRunner, error) {
var runner ActionRunner
has, err := db.GetEngine(ctx).Where("uuid=?", uuid).Get(&runner)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("runner with uuid %s: %w", uuid, util.ErrNotExist)
}
return &runner, nil
}
// GetRunnerByID returns a runner via id
func GetRunnerByID(ctx context.Context, id int64) (*ActionRunner, error) {
var runner ActionRunner
has, err := db.GetEngine(ctx).Where("id=?", id).Get(&runner)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("runner with id %d: %w", id, util.ErrNotExist)
}
return &runner, nil
}
// UpdateRunner updates runner's information.
func UpdateRunner(ctx context.Context, r *ActionRunner, cols ...string) error {
e := db.GetEngine(ctx)
r.Name = util.EllipsisDisplayString(r.Name, 255)
var err error
if len(cols) == 0 {
_, err = e.ID(r.ID).AllCols().Update(r)
} else {
_, err = e.ID(r.ID).Cols(cols...).Update(r)
}
return err
}
// DeleteRunner deletes a runner by given ID.
func DeleteRunner(ctx context.Context, id int64) error {
if _, err := GetRunnerByID(ctx, id); err != nil {
return err
}
_, err := db.DeleteByID[ActionRunner](ctx, id)
return err
}
// DeleteEphemeralRunner deletes a ephemeral runner by given ID.
func DeleteEphemeralRunner(ctx context.Context, id int64) error {
runner, err := GetRunnerByID(ctx, id)
if err != nil {
if errors.Is(err, util.ErrNotExist) {
return nil
}
return err
}
if !runner.Ephemeral {
return nil
}
_, err = db.DeleteByID[ActionRunner](ctx, id)
return err
}
// CreateRunner creates new runner.
func CreateRunner(ctx context.Context, t *ActionRunner) error {
if t.OwnerID != 0 && t.RepoID != 0 {
// It's trying to create a runner that belongs to a repository, but OwnerID has been set accidentally.
// Remove OwnerID to avoid confusion; it's not worth returning an error here.
t.OwnerID = 0
}
t.Name = util.EllipsisDisplayString(t.Name, 255)
return db.Insert(ctx, t)
}
func CountRunnersWithoutBelongingOwner(ctx context.Context) (int64, error) {
// Only affect action runners were a owner ID is set, as actions runners
// could also be created on a repository.
return db.GetEngine(ctx).Table("action_runner").
Join("LEFT", "`user`", "`action_runner`.owner_id = `user`.id").
Where("`action_runner`.owner_id != ?", 0).
And(builder.IsNull{"`user`.id"}).
Count(new(ActionRunner))
}
func FixRunnersWithoutBelongingOwner(ctx context.Context) (int64, error) {
subQuery := builder.Select("`action_runner`.id").
From("`action_runner`").
Join("LEFT", "`user`", "`action_runner`.owner_id = `user`.id").
Where(builder.Neq{"`action_runner`.owner_id": 0}).
And(builder.IsNull{"`user`.id"})
b := builder.Delete(builder.In("id", subQuery)).From("`action_runner`")
res, err := db.GetEngine(ctx).Exec(b)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func CountRunnersWithoutBelongingRepo(ctx context.Context) (int64, error) {
return db.GetEngine(ctx).Table("action_runner").
Join("LEFT", "`repository`", "`action_runner`.repo_id = `repository`.id").
Where("`action_runner`.repo_id != ?", 0).
And(builder.IsNull{"`repository`.id"}).
Count(new(ActionRunner))
}
func FixRunnersWithoutBelongingRepo(ctx context.Context) (int64, error) {
subQuery := builder.Select("`action_runner`.id").
From("`action_runner`").
Join("LEFT", "`repository`", "`action_runner`.repo_id = `repository`.id").
Where(builder.Neq{"`action_runner`.repo_id": 0}).
And(builder.IsNull{"`repository`.id"})
b := builder.Delete(builder.In("id", subQuery)).From("`action_runner`")
res, err := db.GetEngine(ctx).Exec(b)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func CountWrongRepoLevelRunners(ctx context.Context) (int64, error) {
var result int64
_, err := db.GetEngine(ctx).SQL("SELECT count(`id`) FROM `action_runner` WHERE `repo_id` > 0 AND `owner_id` > 0").Get(&result)
return result, err
}
func UpdateWrongRepoLevelRunners(ctx context.Context) (int64, error) {
result, err := db.GetEngine(ctx).Exec("UPDATE `action_runner` SET `owner_id` = 0 WHERE `repo_id` > 0 AND `owner_id` > 0")
if err != nil {
return 0, err
}
return result.RowsAffected()
}

View File

@@ -0,0 +1,65 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/container"
)
type RunnerList []*ActionRunner
// GetUserIDs returns a slice of user's id
func (runners RunnerList) GetUserIDs() []int64 {
return container.FilterSlice(runners, func(runner *ActionRunner) (int64, bool) {
return runner.OwnerID, runner.OwnerID != 0
})
}
func (runners RunnerList) LoadOwners(ctx context.Context) error {
userIDs := runners.GetUserIDs()
users := make(map[int64]*user_model.User, len(userIDs))
if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil {
return err
}
for _, runner := range runners {
if runner.OwnerID > 0 && runner.Owner == nil {
runner.Owner = users[runner.OwnerID]
}
}
return nil
}
func (runners RunnerList) getRepoIDs() []int64 {
return container.FilterSlice(runners, func(runner *ActionRunner) (int64, bool) {
return runner.RepoID, runner.RepoID > 0
})
}
func (runners RunnerList) LoadRepos(ctx context.Context) error {
repoIDs := runners.getRepoIDs()
repos := make(map[int64]*repo_model.Repository, len(repoIDs))
if err := db.GetEngine(ctx).In("id", repoIDs).Find(&repos); err != nil {
return err
}
for _, runner := range runners {
if runner.RepoID > 0 && runner.Repo == nil {
runner.Repo = repos[runner.RepoID]
}
}
return nil
}
func (runners RunnerList) LoadAttributes(ctx context.Context) error {
if err := runners.LoadOwners(ctx); err != nil {
return err
}
return runners.LoadRepos(ctx)
}

View File

@@ -0,0 +1,124 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"fmt"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
)
// ActionRunnerToken represents runner tokens
//
// It can be:
// 1. global token, OwnerID is 0 and RepoID is 0
// 2. org/user level token, OwnerID is org/user ID and RepoID is 0
// 3. repo level token, OwnerID is 0 and RepoID is repo ID
//
// Please note that it's not acceptable to have both OwnerID and RepoID to be non-zero,
// or it will be complicated to find tokens belonging to a specific owner.
// For example, conditions like `OwnerID = 1` will also return token {OwnerID: 1, RepoID: 1},
// but it's a repo level token, not an org/user level token.
// To avoid this, make it clear with {OwnerID: 0, RepoID: 1} for repo level tokens.
type ActionRunnerToken struct {
ID int64
Token string `xorm:"UNIQUE"`
OwnerID int64 `xorm:"index"`
Owner *user_model.User `xorm:"-"`
RepoID int64 `xorm:"index"`
Repo *repo_model.Repository `xorm:"-"`
IsActive bool // true means it can be used
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
Deleted timeutil.TimeStamp `xorm:"deleted"`
}
func init() {
db.RegisterModel(new(ActionRunnerToken))
}
// GetRunnerToken returns a action runner via token
func GetRunnerToken(ctx context.Context, token string) (*ActionRunnerToken, error) {
var runnerToken ActionRunnerToken
has, err := db.GetEngine(ctx).Where("token=?", token).Get(&runnerToken)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf(`runner token "%s...": %w`, util.TruncateRunes(token, 3), util.ErrNotExist)
}
return &runnerToken, nil
}
// UpdateRunnerToken updates runner token information.
func UpdateRunnerToken(ctx context.Context, r *ActionRunnerToken, cols ...string) (err error) {
e := db.GetEngine(ctx)
if len(cols) == 0 {
_, err = e.ID(r.ID).AllCols().Update(r)
} else {
_, err = e.ID(r.ID).Cols(cols...).Update(r)
}
return err
}
// NewRunnerTokenWithValue creates a new active runner token and invalidate all old tokens
// ownerID will be ignored and treated as 0 if repoID is non-zero.
func NewRunnerTokenWithValue(ctx context.Context, ownerID, repoID int64, token string) (*ActionRunnerToken, error) {
if ownerID != 0 && repoID != 0 {
// It's trying to create a runner token that belongs to a repository, but OwnerID has been set accidentally.
// Remove OwnerID to avoid confusion; it's not worth returning an error here.
ownerID = 0
}
runnerToken := &ActionRunnerToken{
OwnerID: ownerID,
RepoID: repoID,
IsActive: true,
Token: token,
}
return runnerToken, db.WithTx(ctx, func(ctx context.Context) error {
if _, err := db.GetEngine(ctx).Where("owner_id =? AND repo_id = ?", ownerID, repoID).Cols("is_active").Update(&ActionRunnerToken{
IsActive: false,
}); err != nil {
return err
}
_, err := db.GetEngine(ctx).Insert(runnerToken)
return err
})
}
func NewRunnerToken(ctx context.Context, ownerID, repoID int64) (*ActionRunnerToken, error) {
token, err := util.CryptoRandomString(40)
if err != nil {
return nil, err
}
return NewRunnerTokenWithValue(ctx, ownerID, repoID, token)
}
// GetLatestRunnerToken returns the latest runner token
func GetLatestRunnerToken(ctx context.Context, ownerID, repoID int64) (*ActionRunnerToken, error) {
if ownerID != 0 && repoID != 0 {
// It's trying to get a runner token that belongs to a repository, but OwnerID has been set accidentally.
// Remove OwnerID to avoid confusion; it's not worth returning an error here.
ownerID = 0
}
var runnerToken ActionRunnerToken
has, err := db.GetEngine(ctx).Where("owner_id=? AND repo_id=?", ownerID, repoID).
OrderBy("id DESC").Get(&runnerToken)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("runner token: %w", util.ErrNotExist)
}
return &runnerToken, nil
}

View File

@@ -0,0 +1,39 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"testing"
"code.gitea.io/gitea/models/unittest"
"github.com/stretchr/testify/assert"
)
func TestGetLatestRunnerToken(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
token := unittest.AssertExistsAndLoadBean(t, &ActionRunnerToken{ID: 3})
expectedToken, err := GetLatestRunnerToken(t.Context(), 1, 0)
assert.NoError(t, err)
assert.Equal(t, expectedToken, token)
}
func TestNewRunnerToken(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
token, err := NewRunnerToken(t.Context(), 1, 0)
assert.NoError(t, err)
expectedToken, err := GetLatestRunnerToken(t.Context(), 1, 0)
assert.NoError(t, err)
assert.Equal(t, expectedToken, token)
}
func TestUpdateRunnerToken(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
token := unittest.AssertExistsAndLoadBean(t, &ActionRunnerToken{ID: 3})
token.IsActive = true
assert.NoError(t, UpdateRunnerToken(t.Context(), token))
expectedToken, err := GetLatestRunnerToken(t.Context(), 1, 0)
assert.NoError(t, err)
assert.Equal(t, expectedToken, token)
}

127
models/actions/schedule.go Normal file
View File

@@ -0,0 +1,127 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"fmt"
"time"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
webhook_module "code.gitea.io/gitea/modules/webhook"
)
// ActionSchedule represents a schedule of a workflow file
type ActionSchedule struct {
ID int64
Title string
Specs []string
RepoID int64 `xorm:"index"`
Repo *repo_model.Repository `xorm:"-"`
OwnerID int64 `xorm:"index"`
WorkflowID string
TriggerUserID int64
TriggerUser *user_model.User `xorm:"-"`
Ref string
CommitSHA string
Event webhook_module.HookEventType
EventPayload string `xorm:"LONGTEXT"`
Content []byte
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}
func init() {
db.RegisterModel(new(ActionSchedule))
}
// GetSchedulesMapByIDs returns the schedules by given id slice.
func GetSchedulesMapByIDs(ctx context.Context, ids []int64) (map[int64]*ActionSchedule, error) {
schedules := make(map[int64]*ActionSchedule, len(ids))
if len(ids) == 0 {
return schedules, nil
}
return schedules, db.GetEngine(ctx).In("id", ids).Find(&schedules)
}
// CreateScheduleTask creates new schedule task.
func CreateScheduleTask(ctx context.Context, rows []*ActionSchedule) error {
// Return early if there are no rows to insert
if len(rows) == 0 {
return nil
}
return db.WithTx(ctx, func(ctx context.Context) error {
// Loop through each schedule row
for _, row := range rows {
row.Title = util.EllipsisDisplayString(row.Title, 255)
// Create new schedule row
if err := db.Insert(ctx, row); err != nil {
return err
}
// Loop through each schedule spec and create a new spec row
now := time.Now()
for _, spec := range row.Specs {
specRow := &ActionScheduleSpec{
RepoID: row.RepoID,
ScheduleID: row.ID,
Spec: spec,
}
// Parse the spec and check for errors
schedule, err := specRow.Parse()
if err != nil {
continue // skip to the next spec if there's an error
}
specRow.Next = timeutil.TimeStamp(schedule.Next(now).Unix())
// Insert the new schedule spec row
if err = db.Insert(ctx, specRow); err != nil {
return err
}
}
}
return nil
})
}
func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error {
return db.WithTx(ctx, func(ctx context.Context) error {
if _, err := db.GetEngine(ctx).Delete(&ActionSchedule{RepoID: id}); err != nil {
return err
}
if _, err := db.GetEngine(ctx).Delete(&ActionScheduleSpec{RepoID: id}); err != nil {
return err
}
return nil
})
}
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) ([]*ActionRunJob, error) {
// If actions disabled when there is schedule task, this will remove the outdated schedule tasks
// There is no other place we can do this because the app.ini will be changed manually
if err := DeleteScheduleTaskByRepo(ctx, repo.ID); err != nil {
return nil, fmt.Errorf("DeleteCronTaskByRepo: %v", err)
}
// cancel running cron jobs of this repository and delete old schedules
jobs, err := CancelPreviousJobs(
ctx,
repo.ID,
repo.DefaultBranch,
"",
webhook_module.HookEventSchedule,
)
if err != nil {
return jobs, fmt.Errorf("CancelPreviousJobs: %v", err)
}
return jobs, nil
}

View File

@@ -0,0 +1,83 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/container"
"xorm.io/builder"
)
type ScheduleList []*ActionSchedule
// GetUserIDs returns a slice of user's id
func (schedules ScheduleList) GetUserIDs() []int64 {
return container.FilterSlice(schedules, func(schedule *ActionSchedule) (int64, bool) {
return schedule.TriggerUserID, true
})
}
func (schedules ScheduleList) GetRepoIDs() []int64 {
return container.FilterSlice(schedules, func(schedule *ActionSchedule) (int64, bool) {
return schedule.RepoID, true
})
}
func (schedules ScheduleList) LoadTriggerUser(ctx context.Context) error {
userIDs := schedules.GetUserIDs()
users := make(map[int64]*user_model.User, len(userIDs))
if err := db.GetEngine(ctx).In("id", userIDs).Find(&users); err != nil {
return err
}
for _, schedule := range schedules {
if schedule.TriggerUserID == user_model.ActionsUserID {
schedule.TriggerUser = user_model.NewActionsUser()
} else {
schedule.TriggerUser = users[schedule.TriggerUserID]
if schedule.TriggerUser == nil {
schedule.TriggerUser = user_model.NewGhostUser()
}
}
}
return nil
}
func (schedules ScheduleList) LoadRepos(ctx context.Context) error {
repoIDs := schedules.GetRepoIDs()
repos, err := repo_model.GetRepositoriesMapByIDs(ctx, repoIDs)
if err != nil {
return err
}
for _, schedule := range schedules {
schedule.Repo = repos[schedule.RepoID]
}
return nil
}
type FindScheduleOptions struct {
db.ListOptions
RepoID int64
OwnerID int64
}
func (opts FindScheduleOptions) ToConds() builder.Cond {
cond := builder.NewCond()
if opts.RepoID > 0 {
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
}
if opts.OwnerID > 0 {
cond = cond.And(builder.Eq{"owner_id": opts.OwnerID})
}
return cond
}
func (opts FindScheduleOptions) ToOrders() string {
return "`id` DESC"
}

View File

@@ -0,0 +1,73 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"strings"
"time"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/timeutil"
"github.com/robfig/cron/v3"
)
// ActionScheduleSpec represents a schedule spec of a workflow file
type ActionScheduleSpec struct {
ID int64
RepoID int64 `xorm:"index"`
Repo *repo_model.Repository `xorm:"-"`
ScheduleID int64 `xorm:"index"`
Schedule *ActionSchedule `xorm:"-"`
// Next time the job will run, or the zero time if Cron has not been
// started or this entry's schedule is unsatisfiable
Next timeutil.TimeStamp `xorm:"index"`
// Prev is the last time this job was run, or the zero time if never.
Prev timeutil.TimeStamp
Spec string
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}
// Parse parses the spec and returns a cron.Schedule
// Unlike the default cron parser, Parse uses UTC timezone as the default if none is specified.
func (s *ActionScheduleSpec) Parse() (cron.Schedule, error) {
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
schedule, err := parser.Parse(s.Spec)
if err != nil {
return nil, err
}
// If the spec has specified a timezone, use it
if strings.HasPrefix(s.Spec, "TZ=") || strings.HasPrefix(s.Spec, "CRON_TZ=") {
return schedule, nil
}
specSchedule, ok := schedule.(*cron.SpecSchedule)
// If it's not a spec schedule, like "@every 5m", timezone is not relevant
if !ok {
return schedule, nil
}
// Set the timezone to UTC
specSchedule.Location = time.UTC
return specSchedule, nil
}
func init() {
db.RegisterModel(new(ActionScheduleSpec))
}
func UpdateScheduleSpec(ctx context.Context, spec *ActionScheduleSpec, cols ...string) error {
sess := db.GetEngine(ctx).ID(spec.ID)
if len(cols) > 0 {
sess.Cols(cols...)
}
_, err := sess.Update(spec)
return err
}

View File

@@ -0,0 +1,97 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/container"
"xorm.io/builder"
)
type SpecList []*ActionScheduleSpec
func (specs SpecList) GetScheduleIDs() []int64 {
return container.FilterSlice(specs, func(spec *ActionScheduleSpec) (int64, bool) {
return spec.ScheduleID, true
})
}
func (specs SpecList) LoadSchedules(ctx context.Context) error {
scheduleIDs := specs.GetScheduleIDs()
schedules, err := GetSchedulesMapByIDs(ctx, scheduleIDs)
if err != nil {
return err
}
for _, spec := range specs {
spec.Schedule = schedules[spec.ScheduleID]
}
repoIDs := specs.GetRepoIDs()
repos, err := repo_model.GetRepositoriesMapByIDs(ctx, repoIDs)
if err != nil {
return err
}
for _, spec := range specs {
spec.Repo = repos[spec.RepoID]
}
return nil
}
func (specs SpecList) GetRepoIDs() []int64 {
return container.FilterSlice(specs, func(spec *ActionScheduleSpec) (int64, bool) {
return spec.RepoID, true
})
}
func (specs SpecList) LoadRepos(ctx context.Context) error {
repoIDs := specs.GetRepoIDs()
repos, err := repo_model.GetRepositoriesMapByIDs(ctx, repoIDs)
if err != nil {
return err
}
for _, spec := range specs {
spec.Repo = repos[spec.RepoID]
}
return nil
}
type FindSpecOptions struct {
db.ListOptions
RepoID int64
Next int64
}
func (opts FindSpecOptions) ToConds() builder.Cond {
cond := builder.NewCond()
if opts.RepoID > 0 {
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
}
if opts.Next > 0 {
cond = cond.And(builder.Lte{"next": opts.Next})
}
return cond
}
func (opts FindSpecOptions) ToOrders() string {
return "`id` DESC"
}
func FindSpecs(ctx context.Context, opts FindSpecOptions) (SpecList, int64, error) {
specs, total, err := db.FindAndCount[ActionScheduleSpec](ctx, opts)
if err != nil {
return nil, 0, err
}
if err := SpecList(specs).LoadSchedules(ctx); err != nil {
return nil, 0, err
}
return specs, total, nil
}

View File

@@ -0,0 +1,69 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"testing"
"time"
"code.gitea.io/gitea/modules/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestActionScheduleSpec_Parse(t *testing.T) {
// Mock the local timezone is not UTC
tz, err := time.LoadLocation("Asia/Shanghai")
require.NoError(t, err)
defer test.MockVariableValue(&time.Local, tz)()
now, err := time.Parse(time.RFC3339, "2024-07-31T15:47:55+08:00")
require.NoError(t, err)
tests := []struct {
name string
spec string
want string
wantErr assert.ErrorAssertionFunc
}{
{
name: "regular",
spec: "0 10 * * *",
want: "2024-07-31T10:00:00Z",
wantErr: assert.NoError,
},
{
name: "invalid",
spec: "0 10 * *",
want: "",
wantErr: assert.Error,
},
{
name: "with timezone",
spec: "TZ=America/New_York 0 10 * * *",
want: "2024-07-31T14:00:00Z",
wantErr: assert.NoError,
},
{
name: "timezone irrelevant",
spec: "@every 5m",
want: "2024-07-31T07:52:55Z",
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &ActionScheduleSpec{
Spec: tt.spec,
}
got, err := s.Parse()
tt.wantErr(t, err)
if err == nil {
assert.Equal(t, tt.want, got.Next(now).UTC().Format(time.RFC3339))
}
})
}
}

101
models/actions/status.go Normal file
View File

@@ -0,0 +1,101 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"slices"
"code.gitea.io/gitea/modules/translation"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
)
// Status represents the status of ActionRun, ActionRunJob, ActionTask, or ActionTaskStep
type Status int
const (
StatusUnknown Status = iota // 0, consistent with runnerv1.Result_RESULT_UNSPECIFIED
StatusSuccess // 1, consistent with runnerv1.Result_RESULT_SUCCESS
StatusFailure // 2, consistent with runnerv1.Result_RESULT_FAILURE
StatusCancelled // 3, consistent with runnerv1.Result_RESULT_CANCELLED
StatusSkipped // 4, consistent with runnerv1.Result_RESULT_SKIPPED
StatusWaiting // 5, isn't a runnerv1.Result
StatusRunning // 6, isn't a runnerv1.Result
StatusBlocked // 7, isn't a runnerv1.Result
)
var statusNames = map[Status]string{
StatusUnknown: "unknown",
StatusWaiting: "waiting",
StatusRunning: "running",
StatusSuccess: "success",
StatusFailure: "failure",
StatusCancelled: "cancelled",
StatusSkipped: "skipped",
StatusBlocked: "blocked",
}
// String returns the string name of the Status
func (s Status) String() string {
return statusNames[s]
}
// LocaleString returns the locale string name of the Status
func (s Status) LocaleString(lang translation.Locale) string {
return lang.TrString("actions.status." + s.String())
}
// IsDone returns whether the Status is final
func (s Status) IsDone() bool {
return s.In(StatusSuccess, StatusFailure, StatusCancelled, StatusSkipped)
}
// HasRun returns whether the Status is a result of running
func (s Status) HasRun() bool {
return s.In(StatusSuccess, StatusFailure)
}
func (s Status) IsUnknown() bool {
return s == StatusUnknown
}
func (s Status) IsSuccess() bool {
return s == StatusSuccess
}
func (s Status) IsFailure() bool {
return s == StatusFailure
}
func (s Status) IsCancelled() bool {
return s == StatusCancelled
}
func (s Status) IsSkipped() bool {
return s == StatusSkipped
}
func (s Status) IsWaiting() bool {
return s == StatusWaiting
}
func (s Status) IsRunning() bool {
return s == StatusRunning
}
func (s Status) IsBlocked() bool {
return s == StatusBlocked
}
// In returns whether s is one of the given statuses
func (s Status) In(statuses ...Status) bool {
return slices.Contains(statuses, s)
}
func (s Status) AsResult() runnerv1.Result {
if s.IsDone() {
return runnerv1.Result(s)
}
return runnerv1.Result_RESULT_UNSPECIFIED
}

526
models/actions/task.go Normal file
View File

@@ -0,0 +1,526 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"crypto/subtle"
"errors"
"fmt"
"time"
auth_model "code.gitea.io/gitea/models/auth"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unit"
"code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/nektos/act/pkg/jobparser"
"google.golang.org/protobuf/types/known/timestamppb"
"xorm.io/builder"
)
// ActionTask represents a distribution of job
type ActionTask struct {
ID int64
JobID int64
Job *ActionRunJob `xorm:"-"`
Steps []*ActionTaskStep `xorm:"-"`
Attempt int64
RunnerID int64 `xorm:"index"`
Status Status `xorm:"index"`
Started timeutil.TimeStamp `xorm:"index"`
Stopped timeutil.TimeStamp `xorm:"index(stopped_log_expired)"`
RepoID int64 `xorm:"index"`
OwnerID int64 `xorm:"index"`
CommitSHA string `xorm:"index"`
IsForkPullRequest bool
Token string `xorm:"-"`
TokenHash string `xorm:"UNIQUE"` // sha256 of token
TokenSalt string
TokenLastEight string `xorm:"index token_last_eight"`
LogFilename string // file name of log
LogInStorage bool // read log from database or from storage
LogLength int64 // lines count
LogSize int64 // blob size
LogIndexes LogIndexes `xorm:"LONGBLOB"` // line number to offset
LogExpired bool `xorm:"index(stopped_log_expired)"` // files that are too old will be deleted
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated index"`
}
var successfulTokenTaskCache *lru.Cache[string, any]
func init() {
db.RegisterModel(new(ActionTask), func() error {
if setting.SuccessfulTokensCacheSize > 0 {
var err error
successfulTokenTaskCache, err = lru.New[string, any](setting.SuccessfulTokensCacheSize)
if err != nil {
return fmt.Errorf("unable to allocate Task cache: %v", err)
}
} else {
successfulTokenTaskCache = nil
}
return nil
})
}
func (task *ActionTask) Duration() time.Duration {
return calculateDuration(task.Started, task.Stopped, task.Status)
}
func (task *ActionTask) IsStopped() bool {
return task.Stopped > 0
}
func (task *ActionTask) GetRunLink() string {
if task.Job == nil || task.Job.Run == nil {
return ""
}
return task.Job.Run.Link()
}
func (task *ActionTask) GetCommitLink() string {
if task.Job == nil || task.Job.Run == nil || task.Job.Run.Repo == nil {
return ""
}
return task.Job.Run.Repo.CommitLink(task.CommitSHA)
}
func (task *ActionTask) GetRepoName() string {
if task.Job == nil || task.Job.Run == nil || task.Job.Run.Repo == nil {
return ""
}
return task.Job.Run.Repo.FullName()
}
func (task *ActionTask) GetRepoLink() string {
if task.Job == nil || task.Job.Run == nil || task.Job.Run.Repo == nil {
return ""
}
return task.Job.Run.Repo.Link()
}
func (task *ActionTask) LoadJob(ctx context.Context) error {
if task.Job == nil {
job, err := GetRunJobByID(ctx, task.JobID)
if err != nil {
return err
}
task.Job = job
}
return nil
}
// LoadAttributes load Job Steps if not loaded
func (task *ActionTask) LoadAttributes(ctx context.Context) error {
if task == nil {
return nil
}
if err := task.LoadJob(ctx); err != nil {
return err
}
if err := task.Job.LoadAttributes(ctx); err != nil {
return err
}
if task.Steps == nil { // be careful, an empty slice (not nil) also means loaded
steps, err := GetTaskStepsByTaskID(ctx, task.ID)
if err != nil {
return err
}
task.Steps = steps
}
return nil
}
func (task *ActionTask) GenerateToken() (err error) {
task.Token, task.TokenSalt, task.TokenHash, task.TokenLastEight, err = generateSaltedToken()
return err
}
func GetTaskByID(ctx context.Context, id int64) (*ActionTask, error) {
var task ActionTask
has, err := db.GetEngine(ctx).Where("id=?", id).Get(&task)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("task with id %d: %w", id, util.ErrNotExist)
}
return &task, nil
}
func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, error) {
errNotExist := fmt.Errorf("task with token %q: %w", token, util.ErrNotExist)
if token == "" {
return nil, errNotExist
}
// A token is defined as being SHA1 sum these are 40 hexadecimal bytes long
if len(token) != 40 {
return nil, errNotExist
}
for _, x := range []byte(token) {
if x < '0' || (x > '9' && x < 'a') || x > 'f' {
return nil, errNotExist
}
}
lastEight := token[len(token)-8:]
if id := getTaskIDFromCache(token); id > 0 {
task := &ActionTask{
TokenLastEight: lastEight,
}
// Re-get the task from the db in case it has been deleted in the intervening period
has, err := db.GetEngine(ctx).ID(id).Get(task)
if err != nil {
return nil, err
}
if has {
return task, nil
}
successfulTokenTaskCache.Remove(token)
}
var tasks []*ActionTask
err := db.GetEngine(ctx).Where("token_last_eight = ? AND status = ?", lastEight, StatusRunning).Find(&tasks)
if err != nil {
return nil, err
} else if len(tasks) == 0 {
return nil, errNotExist
}
for _, t := range tasks {
tempHash := auth_model.HashToken(token, t.TokenSalt)
if subtle.ConstantTimeCompare([]byte(t.TokenHash), []byte(tempHash)) == 1 {
if successfulTokenTaskCache != nil {
successfulTokenTaskCache.Add(token, t.ID)
}
return t, nil
}
}
return nil, errNotExist
}
func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) {
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return nil, false, err
}
defer committer.Close()
e := db.GetEngine(ctx)
jobCond := builder.NewCond()
if runner.RepoID != 0 {
jobCond = builder.Eq{"repo_id": runner.RepoID}
} else if runner.OwnerID != 0 {
jobCond = builder.In("repo_id", builder.Select("`repository`.id").From("repository").
Join("INNER", "repo_unit", "`repository`.id = `repo_unit`.repo_id").
Where(builder.Eq{"`repository`.owner_id": runner.OwnerID, "`repo_unit`.type": unit.TypeActions}))
}
if jobCond.IsValid() {
jobCond = builder.In("run_id", builder.Select("id").From("action_run").Where(jobCond))
}
var jobs []*ActionRunJob
if err := e.Where("task_id=? AND status=?", 0, StatusWaiting).And(jobCond).Asc("updated", "id").Find(&jobs); err != nil {
return nil, false, err
}
// TODO: a more efficient way to filter labels
var job *ActionRunJob
log.Trace("runner labels: %v", runner.AgentLabels)
for _, v := range jobs {
if isSubset(runner.AgentLabels, v.RunsOn) {
job = v
break
}
}
if job == nil {
return nil, false, nil
}
if err := job.LoadAttributes(ctx); err != nil {
return nil, false, err
}
now := timeutil.TimeStampNow()
job.Attempt++
job.Started = now
job.Status = StatusRunning
task := &ActionTask{
JobID: job.ID,
Attempt: job.Attempt,
RunnerID: runner.ID,
Started: now,
Status: StatusRunning,
RepoID: job.RepoID,
OwnerID: job.OwnerID,
CommitSHA: job.CommitSHA,
IsForkPullRequest: job.IsForkPullRequest,
}
if err := task.GenerateToken(); err != nil {
return nil, false, err
}
parsedWorkflows, err := jobparser.Parse(job.WorkflowPayload)
if err != nil {
return nil, false, fmt.Errorf("parse workflow of job %d: %w", job.ID, err)
} else if len(parsedWorkflows) != 1 {
return nil, false, fmt.Errorf("workflow of job %d: not single workflow", job.ID)
}
_, workflowJob := parsedWorkflows[0].Job()
if _, err := e.Insert(task); err != nil {
return nil, false, err
}
task.LogFilename = logFileName(job.Run.Repo.FullName(), task.ID)
if err := UpdateTask(ctx, task, "log_filename"); err != nil {
return nil, false, err
}
if len(workflowJob.Steps) > 0 {
steps := make([]*ActionTaskStep, len(workflowJob.Steps))
for i, v := range workflowJob.Steps {
name := util.EllipsisDisplayString(v.String(), 255)
steps[i] = &ActionTaskStep{
Name: name,
TaskID: task.ID,
Index: int64(i),
RepoID: task.RepoID,
Status: StatusWaiting,
}
}
if _, err := e.Insert(steps); err != nil {
return nil, false, err
}
task.Steps = steps
}
job.TaskID = task.ID
if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil {
return nil, false, err
} else if n != 1 {
return nil, false, nil
}
task.Job = job
if err := committer.Commit(); err != nil {
return nil, false, err
}
return task, true, nil
}
func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error {
sess := db.GetEngine(ctx).ID(task.ID)
if len(cols) > 0 {
sess.Cols(cols...)
}
_, err := sess.Update(task)
// Automatically delete the ephemeral runner if the task is done
if err == nil && task.Status.IsDone() && util.SliceContainsString(cols, "status") {
return DeleteEphemeralRunner(ctx, task.RunnerID)
}
return err
}
// UpdateTaskByState updates the task by the state.
// It will always update the task if the state is not final, even there is no change.
// So it will update ActionTask.Updated to avoid the task being judged as a zombie task.
func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.TaskState) (*ActionTask, error) {
stepStates := map[int64]*runnerv1.StepState{}
for _, v := range state.Steps {
stepStates[v.Id] = v
}
return db.WithTx2(ctx, func(ctx context.Context) (*ActionTask, error) {
e := db.GetEngine(ctx)
task := &ActionTask{}
if has, err := e.ID(state.Id).Get(task); err != nil {
return nil, err
} else if !has {
return nil, util.ErrNotExist
} else if runnerID != task.RunnerID {
return nil, errors.New("invalid runner for task")
}
if task.Status.IsDone() {
// the state is final, do nothing
return task, nil
}
// state.Result is not unspecified means the task is finished
if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
task.Status = Status(state.Result)
task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
return nil, err
}
if _, err := UpdateRunJob(ctx, &ActionRunJob{
ID: task.JobID,
Status: task.Status,
Stopped: task.Stopped,
}, nil); err != nil {
return nil, err
}
} else {
// Force update ActionTask.Updated to avoid the task being judged as a zombie task
task.Updated = timeutil.TimeStampNow()
if err := UpdateTask(ctx, task, "updated"); err != nil {
return nil, err
}
}
if err := task.LoadAttributes(ctx); err != nil {
return nil, err
}
for _, step := range task.Steps {
var result runnerv1.Result
if v, ok := stepStates[step.Index]; ok {
result = v.Result
step.LogIndex = v.LogIndex
step.LogLength = v.LogLength
step.Started = convertTimestamp(v.StartedAt)
step.Stopped = convertTimestamp(v.StoppedAt)
}
if result != runnerv1.Result_RESULT_UNSPECIFIED {
step.Status = Status(result)
} else if step.Started != 0 {
step.Status = StatusRunning
}
if _, err := e.ID(step.ID).Update(step); err != nil {
return nil, err
}
}
return task, nil
})
}
func StopTask(ctx context.Context, taskID int64, status Status) error {
if !status.IsDone() {
return fmt.Errorf("cannot stop task with status %v", status)
}
e := db.GetEngine(ctx)
task := &ActionTask{}
if has, err := e.ID(taskID).Get(task); err != nil {
return err
} else if !has {
return util.ErrNotExist
}
if task.Status.IsDone() {
return nil
}
now := timeutil.TimeStampNow()
task.Status = status
task.Stopped = now
if _, err := UpdateRunJob(ctx, &ActionRunJob{
ID: task.JobID,
Status: task.Status,
Stopped: task.Stopped,
}, nil); err != nil {
return err
}
if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
return err
}
if err := task.LoadAttributes(ctx); err != nil {
return err
}
for _, step := range task.Steps {
if !step.Status.IsDone() {
step.Status = status
if step.Started == 0 {
step.Started = now
}
step.Stopped = now
}
if _, err := e.ID(step.ID).Update(step); err != nil {
return err
}
}
return nil
}
func FindOldTasksToExpire(ctx context.Context, olderThan timeutil.TimeStamp, limit int) ([]*ActionTask, error) {
e := db.GetEngine(ctx)
tasks := make([]*ActionTask, 0, limit)
// Check "stopped > 0" to avoid deleting tasks that are still running
return tasks, e.Where("stopped > 0 AND stopped < ? AND log_expired = ?", olderThan, false).
Limit(limit).
Find(&tasks)
}
func isSubset(set, subset []string) bool {
m := make(container.Set[string], len(set))
for _, v := range set {
m.Add(v)
}
for _, v := range subset {
if !m.Contains(v) {
return false
}
}
return true
}
func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp {
if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 {
return timeutil.TimeStamp(0)
}
return timeutil.TimeStamp(timestamp.AsTime().Unix())
}
func logFileName(repoFullName string, taskID int64) string {
ret := fmt.Sprintf("%s/%02x/%d.log", repoFullName, taskID%256, taskID)
if setting.Actions.LogCompression.IsZstd() {
ret += ".zst"
}
return ret
}
func getTaskIDFromCache(token string) int64 {
if successfulTokenTaskCache == nil {
return 0
}
tInterface, ok := successfulTokenTaskCache.Get(token)
if !ok {
return 0
}
t, ok := tInterface.(int64)
if !ok {
return 0
}
return t
}

View File

@@ -0,0 +1,91 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/timeutil"
"xorm.io/builder"
)
type TaskList []*ActionTask
func (tasks TaskList) GetJobIDs() []int64 {
return container.FilterSlice(tasks, func(t *ActionTask) (int64, bool) {
return t.JobID, t.JobID != 0
})
}
func (tasks TaskList) LoadJobs(ctx context.Context) error {
jobIDs := tasks.GetJobIDs()
jobs := make(map[int64]*ActionRunJob, len(jobIDs))
if err := db.GetEngine(ctx).In("id", jobIDs).Find(&jobs); err != nil {
return err
}
for _, t := range tasks {
if t.JobID > 0 && t.Job == nil {
t.Job = jobs[t.JobID]
}
}
// TODO: Replace with "ActionJobList(maps.Values(jobs))" once available
var jobsList ActionJobList = make([]*ActionRunJob, 0, len(jobs))
for _, j := range jobs {
jobsList = append(jobsList, j)
}
return jobsList.LoadAttributes(ctx, true)
}
func (tasks TaskList) LoadAttributes(ctx context.Context) error {
return tasks.LoadJobs(ctx)
}
type FindTaskOptions struct {
db.ListOptions
RepoID int64
JobID int64
OwnerID int64
CommitSHA string
Status Status
UpdatedBefore timeutil.TimeStamp
StartedBefore timeutil.TimeStamp
RunnerID int64
}
func (opts FindTaskOptions) ToConds() builder.Cond {
cond := builder.NewCond()
if opts.RepoID > 0 {
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
}
if opts.JobID > 0 {
cond = cond.And(builder.Eq{"job_id": opts.JobID})
}
if opts.OwnerID > 0 {
cond = cond.And(builder.Eq{"owner_id": opts.OwnerID})
}
if opts.CommitSHA != "" {
cond = cond.And(builder.Eq{"commit_sha": opts.CommitSHA})
}
if opts.Status > StatusUnknown {
cond = cond.And(builder.Eq{"status": opts.Status})
}
if opts.UpdatedBefore > 0 {
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
}
if opts.StartedBefore > 0 {
cond = cond.And(builder.Lt{"started": opts.StartedBefore})
}
if opts.RunnerID > 0 {
cond = cond.And(builder.Eq{"runner_id": opts.RunnerID})
}
return cond
}
func (opts FindTaskOptions) ToOrders() string {
return "`id` DESC"
}

View File

@@ -0,0 +1,55 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
)
// ActionTaskOutput represents an output of ActionTask.
// So the outputs are bound to a task, that means when a completed job has been rerun,
// the outputs of the job will be reset because the task is new.
// It's by design, to avoid the outputs of the old task to be mixed with the new task.
type ActionTaskOutput struct {
ID int64
TaskID int64 `xorm:"INDEX UNIQUE(task_id_output_key)"`
OutputKey string `xorm:"VARCHAR(255) UNIQUE(task_id_output_key)"`
OutputValue string `xorm:"MEDIUMTEXT"`
}
func init() {
db.RegisterModel(new(ActionTaskOutput))
}
// FindTaskOutputByTaskID returns the outputs of the task.
func FindTaskOutputByTaskID(ctx context.Context, taskID int64) ([]*ActionTaskOutput, error) {
var outputs []*ActionTaskOutput
return outputs, db.GetEngine(ctx).Where("task_id=?", taskID).Find(&outputs)
}
// FindTaskOutputKeyByTaskID returns the keys of the outputs of the task.
func FindTaskOutputKeyByTaskID(ctx context.Context, taskID int64) ([]string, error) {
var keys []string
return keys, db.GetEngine(ctx).Table(ActionTaskOutput{}).Where("task_id=?", taskID).Cols("output_key").Find(&keys)
}
// InsertTaskOutputIfNotExist inserts a new task output if it does not exist.
func InsertTaskOutputIfNotExist(ctx context.Context, taskID int64, key, value string) error {
return db.WithTx(ctx, func(ctx context.Context) error {
sess := db.GetEngine(ctx)
if exist, err := sess.Exist(&ActionTaskOutput{TaskID: taskID, OutputKey: key}); err != nil {
return err
} else if exist {
return nil
}
_, err := sess.Insert(&ActionTaskOutput{
TaskID: taskID,
OutputKey: key,
OutputValue: value,
})
return err
})
}

View File

@@ -0,0 +1,41 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"time"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/timeutil"
)
// ActionTaskStep represents a step of ActionTask
type ActionTaskStep struct {
ID int64
Name string `xorm:"VARCHAR(255)"`
TaskID int64 `xorm:"index unique(task_index)"`
Index int64 `xorm:"index unique(task_index)"`
RepoID int64 `xorm:"index"`
Status Status `xorm:"index"`
LogIndex int64
LogLength int64
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated"`
}
func (step *ActionTaskStep) Duration() time.Duration {
return calculateDuration(step.Started, step.Stopped, step.Status)
}
func init() {
db.RegisterModel(new(ActionTaskStep))
}
func GetTaskStepsByTaskID(ctx context.Context, taskID int64) ([]*ActionTaskStep, error) {
var steps []*ActionTaskStep
return steps, db.GetEngine(ctx).Where("task_id=?", taskID).OrderBy("`index` ASC").Find(&steps)
}

View File

@@ -0,0 +1,101 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil"
)
// ActionTasksVersion
// If both ownerID and repoID is zero, its scope is global.
// If ownerID is not zero and repoID is zero, its scope is org (there is no user-level runner currently).
// If ownerID is zero and repoID is not zero, its scope is repo.
type ActionTasksVersion struct {
ID int64 `xorm:"pk autoincr"`
OwnerID int64 `xorm:"UNIQUE(owner_repo)"`
RepoID int64 `xorm:"INDEX UNIQUE(owner_repo)"`
Version int64
CreatedUnix timeutil.TimeStamp `xorm:"created"`
UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
}
func init() {
db.RegisterModel(new(ActionTasksVersion))
}
func GetTasksVersionByScope(ctx context.Context, ownerID, repoID int64) (int64, error) {
var tasksVersion ActionTasksVersion
has, err := db.GetEngine(ctx).Where("owner_id = ? AND repo_id = ?", ownerID, repoID).Get(&tasksVersion)
if err != nil {
return 0, err
} else if !has {
return 0, nil
}
return tasksVersion.Version, err
}
func insertTasksVersion(ctx context.Context, ownerID, repoID int64) (*ActionTasksVersion, error) {
tasksVersion := &ActionTasksVersion{
OwnerID: ownerID,
RepoID: repoID,
Version: 1,
}
if _, err := db.GetEngine(ctx).Insert(tasksVersion); err != nil {
return nil, err
}
return tasksVersion, nil
}
func increaseTasksVersionByScope(ctx context.Context, ownerID, repoID int64) error {
result, err := db.GetEngine(ctx).Exec("UPDATE action_tasks_version SET version = version + 1 WHERE owner_id = ? AND repo_id = ?", ownerID, repoID)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
// if update sql does not affect any rows, the database may be broken,
// so re-insert the row of version data here.
if _, err := insertTasksVersion(ctx, ownerID, repoID); err != nil {
return err
}
}
return nil
}
func IncreaseTaskVersion(ctx context.Context, ownerID, repoID int64) error {
return db.WithTx(ctx, func(ctx context.Context) error {
// 1. increase global
if err := increaseTasksVersionByScope(ctx, 0, 0); err != nil {
log.Error("IncreaseTasksVersionByScope(Global): %v", err)
return err
}
// 2. increase owner
if ownerID > 0 {
if err := increaseTasksVersionByScope(ctx, ownerID, 0); err != nil {
log.Error("IncreaseTasksVersionByScope(Owner): %v", err)
return err
}
}
// 3. increase repo
if repoID > 0 {
if err := increaseTasksVersionByScope(ctx, 0, repoID); err != nil {
log.Error("IncreaseTasksVersionByScope(Repo): %v", err)
return err
}
}
return nil
})
}

103
models/actions/utils.go Normal file
View File

@@ -0,0 +1,103 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"time"
auth_model "code.gitea.io/gitea/models/auth"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
)
func generateSaltedToken() (string, string, string, string, error) {
salt, err := util.CryptoRandomString(10)
if err != nil {
return "", "", "", "", err
}
buf, err := util.CryptoRandomBytes(20)
if err != nil {
return "", "", "", "", err
}
token := hex.EncodeToString(buf)
hash := auth_model.HashToken(token, salt)
return token, salt, hash, token[len(token)-8:], nil
}
/*
LogIndexes is the index for mapping log line number to buffer offset.
Because it uses varint encoding, it is impossible to predict its size.
But we can make a simple estimate with an assumption that each log line has 200 byte, then:
| lines | file size | index size |
|-----------|---------------------|--------------------|
| 100 | 20 KiB(20000) | 258 B(258) |
| 1000 | 195 KiB(200000) | 2.9 KiB(2958) |
| 10000 | 1.9 MiB(2000000) | 34 KiB(34715) |
| 100000 | 19 MiB(20000000) | 386 KiB(394715) |
| 1000000 | 191 MiB(200000000) | 4.1 MiB(4323626) |
| 10000000 | 1.9 GiB(2000000000) | 47 MiB(49323626) |
| 100000000 | 19 GiB(20000000000) | 490 MiB(513424280) |
*/
type LogIndexes []int64
func (indexes *LogIndexes) FromDB(b []byte) error {
reader := bytes.NewReader(b)
for {
v, err := binary.ReadVarint(reader)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("binary ReadVarint: %w", err)
}
*indexes = append(*indexes, v)
}
}
func (indexes *LogIndexes) ToDB() ([]byte, error) {
buf, i := make([]byte, binary.MaxVarintLen64*len(*indexes)), 0
for _, v := range *indexes {
n := binary.PutVarint(buf[i:], v)
i += n
}
return buf[:i], nil
}
var timeSince = time.Since
func calculateDuration(started, stopped timeutil.TimeStamp, status Status) time.Duration {
if started == 0 {
return 0
}
s := started.AsTime()
if status.IsDone() {
return stopped.AsTime().Sub(s)
}
return timeSince(s).Truncate(time.Second)
}
// best effort function to convert an action schedule to action run, to be used in GenerateGiteaContext
func (s *ActionSchedule) ToActionRun() *ActionRun {
return &ActionRun{
Title: s.Title,
RepoID: s.RepoID,
Repo: s.Repo,
OwnerID: s.OwnerID,
WorkflowID: s.WorkflowID,
TriggerUserID: s.TriggerUserID,
TriggerUser: s.TriggerUser,
Ref: s.Ref,
CommitSHA: s.CommitSHA,
Event: s.Event,
EventPayload: s.EventPayload,
Created: s.Created,
Updated: s.Updated,
}
}

View File

@@ -0,0 +1,90 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"math"
"testing"
"time"
"code.gitea.io/gitea/modules/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestLogIndexes_ToDB(t *testing.T) {
tests := []struct {
indexes LogIndexes
}{
{
indexes: []int64{1, 2, 0, -1, -2, math.MaxInt64, math.MinInt64},
},
}
for _, tt := range tests {
t.Run("", func(t *testing.T) {
got, err := tt.indexes.ToDB()
require.NoError(t, err)
indexes := LogIndexes{}
require.NoError(t, indexes.FromDB(got))
assert.Equal(t, tt.indexes, indexes)
})
}
}
func Test_calculateDuration(t *testing.T) {
oldTimeSince := timeSince
defer func() {
timeSince = oldTimeSince
}()
timeSince = func(t time.Time) time.Duration {
return timeutil.TimeStamp(1000).AsTime().Sub(t)
}
type args struct {
started timeutil.TimeStamp
stopped timeutil.TimeStamp
status Status
}
tests := []struct {
name string
args args
want time.Duration
}{
{
name: "unknown",
args: args{
started: 0,
stopped: 0,
status: StatusUnknown,
},
want: 0,
},
{
name: "running",
args: args{
started: 500,
stopped: 0,
status: StatusRunning,
},
want: 500 * time.Second,
},
{
name: "done",
args: args{
started: 500,
stopped: 600,
status: StatusSuccess,
},
want: 100 * time.Second,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, calculateDuration(tt.args.started, tt.args.stopped, tt.args.status), "calculateDuration(%v, %v, %v)", tt.args.started, tt.args.stopped, tt.args.status)
})
}
}

184
models/actions/variable.go Normal file
View File

@@ -0,0 +1,184 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"strings"
"unicode/utf8"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
"xorm.io/builder"
)
// ActionVariable represents a variable that can be used in actions
//
// It can be:
// 1. global variable, OwnerID is 0 and RepoID is 0
// 2. org/user level variable, OwnerID is org/user ID and RepoID is 0
// 3. repo level variable, OwnerID is 0 and RepoID is repo ID
//
// Please note that it's not acceptable to have both OwnerID and RepoID to be non-zero,
// or it will be complicated to find variables belonging to a specific owner.
// For example, conditions like `OwnerID = 1` will also return variable {OwnerID: 1, RepoID: 1},
// but it's a repo level variable, not an org/user level variable.
// To avoid this, make it clear with {OwnerID: 0, RepoID: 1} for repo level variables.
type ActionVariable struct {
ID int64 `xorm:"pk autoincr"`
OwnerID int64 `xorm:"UNIQUE(owner_repo_name)"`
RepoID int64 `xorm:"INDEX UNIQUE(owner_repo_name)"`
Name string `xorm:"UNIQUE(owner_repo_name) NOT NULL"`
Data string `xorm:"LONGTEXT NOT NULL"`
Description string `xorm:"TEXT"`
CreatedUnix timeutil.TimeStamp `xorm:"created NOT NULL"`
UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
}
const (
VariableDataMaxLength = 65536
VariableDescriptionMaxLength = 4096
)
func init() {
db.RegisterModel(new(ActionVariable))
}
func InsertVariable(ctx context.Context, ownerID, repoID int64, name, data, description string) (*ActionVariable, error) {
if ownerID != 0 && repoID != 0 {
// It's trying to create a variable that belongs to a repository, but OwnerID has been set accidentally.
// Remove OwnerID to avoid confusion; it's not worth returning an error here.
ownerID = 0
}
if utf8.RuneCountInString(data) > VariableDataMaxLength {
return nil, util.NewInvalidArgumentErrorf("data too long")
}
description = util.TruncateRunes(description, VariableDescriptionMaxLength)
variable := &ActionVariable{
OwnerID: ownerID,
RepoID: repoID,
Name: strings.ToUpper(name),
Data: data,
Description: description,
}
return variable, db.Insert(ctx, variable)
}
type FindVariablesOpts struct {
db.ListOptions
IDs []int64
RepoID int64
OwnerID int64 // it will be ignored if RepoID is set
Name string
}
func (opts FindVariablesOpts) ToConds() builder.Cond {
cond := builder.NewCond()
if len(opts.IDs) > 0 {
if len(opts.IDs) == 1 {
cond = cond.And(builder.Eq{"id": opts.IDs[0]})
} else {
cond = cond.And(builder.In("id", opts.IDs))
}
}
// Since we now support instance-level variables,
// there is no need to check for null values for `owner_id` and `repo_id`
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
if opts.RepoID != 0 { // if RepoID is set
// ignore OwnerID and treat it as 0
cond = cond.And(builder.Eq{"owner_id": 0})
} else {
cond = cond.And(builder.Eq{"owner_id": opts.OwnerID})
}
if opts.Name != "" {
cond = cond.And(builder.Eq{"name": strings.ToUpper(opts.Name)})
}
return cond
}
func FindVariables(ctx context.Context, opts FindVariablesOpts) ([]*ActionVariable, error) {
return db.Find[ActionVariable](ctx, opts)
}
func UpdateVariableCols(ctx context.Context, variable *ActionVariable, cols ...string) (bool, error) {
if utf8.RuneCountInString(variable.Data) > VariableDataMaxLength {
return false, util.NewInvalidArgumentErrorf("data too long")
}
variable.Description = util.TruncateRunes(variable.Description, VariableDescriptionMaxLength)
variable.Name = strings.ToUpper(variable.Name)
count, err := db.GetEngine(ctx).
ID(variable.ID).
Cols(cols...).
Update(variable)
return count != 0, err
}
func DeleteVariable(ctx context.Context, id int64) error {
if _, err := db.DeleteByID[ActionVariable](ctx, id); err != nil {
return err
}
return nil
}
func GetVariablesOfRun(ctx context.Context, run *ActionRun) (map[string]string, error) {
variables := map[string]string{}
if err := run.LoadRepo(ctx); err != nil {
log.Error("LoadRepo: %v", err)
return nil, err
}
// Global
globalVariables, err := db.Find[ActionVariable](ctx, FindVariablesOpts{})
if err != nil {
log.Error("find global variables: %v", err)
return nil, err
}
// Org / User level
ownerVariables, err := db.Find[ActionVariable](ctx, FindVariablesOpts{OwnerID: run.Repo.OwnerID})
if err != nil {
log.Error("find variables of org: %d, error: %v", run.Repo.OwnerID, err)
return nil, err
}
// Repo level
repoVariables, err := db.Find[ActionVariable](ctx, FindVariablesOpts{RepoID: run.RepoID})
if err != nil {
log.Error("find variables of repo: %d, error: %v", run.RepoID, err)
return nil, err
}
// Level precedence: Repo > Org / User > Global
for _, v := range append(globalVariables, append(ownerVariables, repoVariables...)...) {
variables[v.Name] = v.Data
}
return variables, nil
}
func CountWrongRepoLevelVariables(ctx context.Context) (int64, error) {
var result int64
_, err := db.GetEngine(ctx).SQL("SELECT count(`id`) FROM `action_variable` WHERE `repo_id` > 0 AND `owner_id` > 0").Get(&result)
return result, err
}
func UpdateWrongRepoLevelVariables(ctx context.Context) (int64, error) {
result, err := db.GetEngine(ctx).Exec("UPDATE `action_variable` SET `owner_id` = 0 WHERE `repo_id` > 0 AND `owner_id` > 0")
if err != nil {
return 0, err
}
return result.RowsAffected()
}