Files
act_runner/internal/app/run/runner.go
dcsunny d3ce144350
Some checks failed
release-nightly / goreleaser (push) Failing after 1m32s
release-nightly / release-image (map[tag_suffix:-dind target:dind]) (push) Has been cancelled
release-nightly / release-image (map[tag_suffix:-dind-rootless target:dind-rootless]) (push) Has been cancelled
release-nightly / release-image (map[tag_suffix: target:basic]) (push) Has been cancelled
checks / check and test (push) Has been cancelled
fix(runner): 解决job容器无法访问缓存服务器的问题
- 修改cache URL设置逻辑,将原始URL转换为可通过宿主机网关访问的地址
- 添加从原始URL中提取端口并替换为宿主机网关IP的功能
- 实现getGatewayIP函数获取容器网络中的宿主机网关IP
- 添加日志记录原始URL和最终使用的缓存URL便于调试
- 支持通过配置文件指定固定的缓存host地址
- 使用172.17.0.1作为Docker默认bridge网络的网关地址
2026-01-30 10:49:05 +08:00

300 lines
9.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package run
import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"strings"
"sync"
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"connectrpc.com/connect"
"github.com/docker/docker/api/types/container"
"github.com/nektos/act/pkg/artifactcache"
"github.com/nektos/act/pkg/common"
"github.com/nektos/act/pkg/model"
"github.com/nektos/act/pkg/runner"
log "github.com/sirupsen/logrus"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/labels"
"gitea.com/gitea/act_runner/internal/pkg/report"
"gitea.com/gitea/act_runner/internal/pkg/ver"
)
// Runner runs the pipeline.
type Runner struct {
name string
cfg *config.Config
client client.Client
labels labels.Labels
envs map[string]string
runningTasks sync.Map
}
func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client) *Runner {
ls := labels.Labels{}
for _, v := range reg.Labels {
if l, err := labels.Parse(v); err == nil {
ls = append(ls, l)
}
}
envs := make(map[string]string, len(cfg.Runner.Envs))
for k, v := range cfg.Runner.Envs {
envs[k] = v
}
if cfg.Cache.Enabled == nil || *cfg.Cache.Enabled {
if cfg.Cache.ExternalServer != "" {
envs["ACTIONS_CACHE_URL"] = cfg.Cache.ExternalServer
} else {
cacheHandler, err := artifactcache.StartHandler(
cfg.Cache.Dir,
cfg.Cache.Host,
cfg.Cache.Port,
log.StandardLogger().WithField("module", "cache_request"),
)
if err != nil {
log.Errorf("cannot init cache server, it will be disabled: %v", err)
// go on
} else {
// 获取缓存服务器的外部URL
originalURL := cacheHandler.ExternalURL()
log.Infof("Cache server ExternalURL: %s", originalURL)
// 从原始URL中提取端口并使用宿主机网关IP
// job容器在不同网络中需要通过宿主机网关访问缓存服务器
cacheURL := originalURL
if strings.HasPrefix(originalURL, "http://") {
parts := strings.Split(originalURL, ":")
if len(parts) >= 3 {
port := parts[len(parts)-1]
// 移除可能的路径部分
if idx := strings.Index(port, "/"); idx != -1 {
port = port[:idx]
}
// 使用宿主机网关IPjob容器可以通过网关访问宿主机服务
// 通常Docker网络的网关是 x.x.x.1job容器可以通过这个IP访问宿主机
cacheURL = fmt.Sprintf("http://%s:%s", getGatewayIP(cfg), port)
}
}
log.Infof("Using cache URL for job containers: %s", cacheURL)
envs["ACTIONS_CACHE_URL"] = cacheURL + "/"
}
}
}
// set artifact gitea api
artifactGiteaAPI := strings.TrimSuffix(cli.Address(), "/") + "/api/actions_pipeline/"
envs["ACTIONS_RUNTIME_URL"] = artifactGiteaAPI
envs["ACTIONS_RESULTS_URL"] = strings.TrimSuffix(cli.Address(), "/")
// Set specific environments to distinguish between Gitea and GitHub
envs["GITEA_ACTIONS"] = "true"
envs["GITEA_ACTIONS_RUNNER_VERSION"] = ver.Version()
return &Runner{
name: reg.Name,
cfg: cfg,
client: cli,
labels: ls,
envs: envs,
}
}
func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
if _, ok := r.runningTasks.Load(task.Id); ok {
return fmt.Errorf("task %d is already running", task.Id)
}
r.runningTasks.Store(task.Id, struct{}{})
defer r.runningTasks.Delete(task.Id)
ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout)
defer cancel()
reporter := report.NewReporter(ctx, cancel, r.client, task)
var runErr error
defer func() {
lastWords := ""
if runErr != nil {
lastWords = runErr.Error()
}
_ = reporter.Close(lastWords)
}()
reporter.RunDaemon()
runErr = r.run(ctx, task, reporter)
return nil
}
// getDefaultActionsURL
// when DEFAULT_ACTIONS_URL == "https://github.com" and GithubMirror is not blank,
// it should be set to GithubMirror first.
func (r *Runner) getDefaultActionsURL(ctx context.Context, task *runnerv1.Task) string {
giteaDefaultActionsURL := task.Context.Fields["gitea_default_actions_url"].GetStringValue()
if giteaDefaultActionsURL == "https://github.com" && r.cfg.Runner.GithubMirror != "" {
return r.cfg.Runner.GithubMirror
}
return giteaDefaultActionsURL
}
func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.Reporter) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()
reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue())
workflow, jobID, err := generateWorkflow(task)
if err != nil {
return err
}
plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID)
if err != nil {
return err
}
job := workflow.GetJob(jobID)
reporter.ResetSteps(len(job.Steps))
taskContext := task.Context.Fields
log.Infof("task %v repo is %v %v %v", task.Id, taskContext["repository"].GetStringValue(),
r.getDefaultActionsURL(ctx, task),
r.client.Address())
preset := &model.GithubContext{
Event: taskContext["event"].GetStructValue().AsMap(),
RunID: taskContext["run_id"].GetStringValue(),
RunNumber: taskContext["run_number"].GetStringValue(),
Actor: taskContext["actor"].GetStringValue(),
Repository: taskContext["repository"].GetStringValue(),
EventName: taskContext["event_name"].GetStringValue(),
Sha: taskContext["sha"].GetStringValue(),
Ref: taskContext["ref"].GetStringValue(),
RefName: taskContext["ref_name"].GetStringValue(),
RefType: taskContext["ref_type"].GetStringValue(),
HeadRef: taskContext["head_ref"].GetStringValue(),
BaseRef: taskContext["base_ref"].GetStringValue(),
Token: taskContext["token"].GetStringValue(),
RepositoryOwner: taskContext["repository_owner"].GetStringValue(),
RetentionDays: taskContext["retention_days"].GetStringValue(),
}
if t := task.Secrets["GITEA_TOKEN"]; t != "" {
preset.Token = t
} else if t := task.Secrets["GITHUB_TOKEN"]; t != "" {
preset.Token = t
}
if actionsIdTokenRequestUrl := taskContext["actions_id_token_request_url"].GetStringValue(); actionsIdTokenRequestUrl != "" {
r.envs["ACTIONS_ID_TOKEN_REQUEST_URL"] = actionsIdTokenRequestUrl
r.envs["ACTIONS_ID_TOKEN_REQUEST_TOKEN"] = taskContext["actions_id_token_request_token"].GetStringValue()
task.Secrets["ACTIONS_ID_TOKEN_REQUEST_TOKEN"] = r.envs["ACTIONS_ID_TOKEN_REQUEST_TOKEN"]
}
giteaRuntimeToken := taskContext["gitea_runtime_token"].GetStringValue()
if giteaRuntimeToken == "" {
// use task token to action api token for previous Gitea Server Versions
giteaRuntimeToken = preset.Token
}
r.envs["ACTIONS_RUNTIME_TOKEN"] = giteaRuntimeToken
eventJSON, err := json.Marshal(preset.Event)
if err != nil {
return err
}
maxLifetime := 3 * time.Hour
if deadline, ok := ctx.Deadline(); ok {
maxLifetime = time.Until(deadline)
}
runnerConfig := &runner.Config{
// On Linux, Workdir will be like "/<parent_directory>/<owner>/<repo>"
// On Windows, Workdir will be like "\<parent_directory>\<owner>\<repo>"
Workdir: filepath.FromSlash(fmt.Sprintf("/%s/%s", strings.TrimLeft(r.cfg.Container.WorkdirParent, "/"), preset.Repository)),
BindWorkdir: false,
ActionCacheDir: filepath.FromSlash(r.cfg.Host.WorkdirParent),
ReuseContainers: false,
ForcePull: r.cfg.Container.ForcePull,
ForceRebuild: r.cfg.Container.ForceRebuild,
LogOutput: true,
JSONLogger: false,
Env: r.envs,
Secrets: task.Secrets,
GitHubInstance: strings.TrimSuffix(r.client.Address(), "/"),
AutoRemove: true,
NoSkipCheckout: true,
PresetGitHubContext: preset,
EventJSON: string(eventJSON),
ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%d", task.Id),
ContainerMaxLifetime: maxLifetime,
ContainerNetworkMode: container.NetworkMode(r.cfg.Container.Network),
ContainerOptions: r.cfg.Container.Options,
ContainerDaemonSocket: r.cfg.Container.DockerHost,
Privileged: r.cfg.Container.Privileged,
DefaultActionInstance: r.getDefaultActionsURL(ctx, task),
PlatformPicker: r.labels.PickPlatform,
Vars: task.Vars,
ValidVolumes: r.cfg.Container.ValidVolumes,
InsecureSkipTLS: r.cfg.Runner.Insecure,
}
rr, err := runner.New(runnerConfig)
if err != nil {
return err
}
executor := rr.NewPlanExecutor(plan)
reporter.Logf("workflow prepared")
// add logger recorders
ctx = common.WithLoggerHook(ctx, reporter)
if !log.IsLevelEnabled(log.DebugLevel) {
ctx = runner.WithJobLoggerFactory(ctx, NullLogger{})
}
execErr := executor(ctx)
reporter.SetOutputs(job.Outputs)
return execErr
}
func (r *Runner) Declare(ctx context.Context, labels []string) (*connect.Response[runnerv1.DeclareResponse], error) {
return r.client.Declare(ctx, connect.NewRequest(&runnerv1.DeclareRequest{
Version: ver.Version(),
Labels: labels,
}))
}
// getGatewayIP 获取宿主机在容器网络中的网关IP
// job容器可以通过网关IP访问宿主机上的服务
func getGatewayIP(cfg *config.Config) string {
// 如果配置了固定的缓存host使用它
if cfg.Cache.Host != "" && cfg.Cache.Host != "0.0.0.0" && cfg.Cache.Host != "::" {
return cfg.Cache.Host
}
// 根据Docker主机类型返回合适的网关IP
// 1. 如果使用 unix socketDocker在宿主机上job容器通过网关访问
// 2. 大多数Docker网络的网关是 x.x.x.1
// 3. 常见的Docker bridge网络
// - docker0 (默认): 172.17.0.1
// - 自定义网络: 172.x.x.1 或 192.168.x.x.1
//
// 使用172.17.0.1作为默认值这是Docker默认bridge网络的网关
// job容器创建的独立网络通常也是172.x.x.1网关
return "172.17.0.1"
}