diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 59eb868..8b82908 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -58,7 +58,7 @@ builds: flags: - -trimpath ldflags: - - -s -w -X gitea.com/gitea/act_runner/cmd.version={{ .Summary }} + - -s -w -X gitea.com/gitea/act_runner/internal/pkg/ver.version={{ .Summary }} binary: >- {{ .ProjectName }}- {{- .Version }}- diff --git a/Makefile b/Makefile index e7e0d39..139cc83 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,6 @@ HAS_GO = $(shell hash $(GO) > /dev/null 2>&1 && echo "GO" || echo "NOGO" ) XGO_PACKAGE ?= src.techknowlogick.com/xgo@latest XGO_VERSION := go-1.18.x GXZ_PAGAGE ?= github.com/ulikunitz/xz/cmd/gxz@v0.5.10 -RUNNER_CMD_PACKAGE_PATH := gitea.com/gitea/act_runner/cmd LINUX_ARCHS ?= linux/amd64,linux/arm64 DARWIN_ARCHS ?= darwin-12/amd64,darwin-12/arm64 @@ -63,7 +62,7 @@ else endif TAGS ?= -LDFLAGS ?= -X "$(RUNNER_CMD_PACKAGE_PATH).version=$(RELASE_VERSION)" +LDFLAGS ?= -X "gitea.com/gitea/act_runner/internal/pkg/ver.version=$(RELASE_VERSION)" all: build diff --git a/artifactcache/README.md b/artifactcache/README.md deleted file mode 100644 index 0a84d84..0000000 --- a/artifactcache/README.md +++ /dev/null @@ -1,7 +0,0 @@ -Inspired by: -https://github.com/sp-ricard-valverde/github-act-cache-server - -TODO: -- Authorization -- [Restrictions for accessing a cache](https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#restrictions-for-accessing-a-cache) -- [Force deleting cache entries](https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#force-deleting-cache-entries) diff --git a/cmd/daemon.go b/cmd/daemon.go deleted file mode 100644 index e826970..0000000 --- a/cmd/daemon.go +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2022 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package cmd - -import ( - "context" - "fmt" - "os" - - "github.com/mattn/go-isatty" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "golang.org/x/sync/errgroup" - - "gitea.com/gitea/act_runner/artifactcache" - "gitea.com/gitea/act_runner/client" - "gitea.com/gitea/act_runner/config" - "gitea.com/gitea/act_runner/engine" - "gitea.com/gitea/act_runner/poller" - "gitea.com/gitea/act_runner/runtime" -) - -func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, args []string) error { - return func(cmd *cobra.Command, args []string) error { - log.Infoln("Starting runner daemon") - - cfg, err := config.LoadDefault(*configFile) - if err != nil { - return fmt.Errorf("invalid configuration: %w", err) - } - - initLogging(cfg) - - reg, err := config.LoadRegistration(cfg.Runner.File) - if os.IsNotExist(err) { - log.Error("registration file not found, please register the runner first") - return err - } else if err != nil { - return fmt.Errorf("failed to load registration file: %w", err) - } - - // require docker if a runner label uses a docker backend - needsDocker := false - for _, l := range reg.Labels { - _, schema, _, _ := runtime.ParseLabel(l) - if schema == "docker" { - needsDocker = true - break - } - } - - if needsDocker { - // try to connect to docker daemon - // if failed, exit with error - if err := engine.Start(ctx); err != nil { - log.WithError(err).Fatalln("failed to connect docker daemon engine") - } - } - - var g errgroup.Group - - cli := client.New( - reg.Address, - cfg.Runner.Insecure, - reg.UUID, - reg.Token, - version, - ) - - runner := &runtime.Runner{ - Client: cli, - Machine: reg.Name, - ForgeInstance: reg.Address, - Environ: cfg.Runner.Envs, - Labels: reg.Labels, - Network: cfg.Container.Network, - Version: version, - } - - if *cfg.Cache.Enabled { - if handler, err := artifactcache.NewHandler(cfg.Cache.Dir, cfg.Cache.Host, cfg.Cache.Port); err != nil { - log.Errorf("cannot init cache server, it will be disabled: %v", err) - } else { - log.Infof("cache handler listens on: %v", handler.ExternalURL()) - runner.CacheHandler = handler - } - } - - poller := poller.New( - cli, - runner.Run, - cfg, - ) - - g.Go(func() error { - l := log.WithField("capacity", cfg.Runner.Capacity). - WithField("endpoint", reg.Address) - l.Infoln("polling the remote server") - - if err := poller.Poll(ctx); err != nil { - l.Errorf("poller error: %v", err) - } - poller.Wait() - return nil - }) - - err = g.Wait() - if err != nil { - log.WithError(err). - Errorln("shutting down the server") - } - return err - } -} - -// initLogging setup the global logrus logger. -func initLogging(cfg *config.Config) { - isTerm := isatty.IsTerminal(os.Stdout.Fd()) - log.SetFormatter(&log.TextFormatter{ - DisableColors: !isTerm, - FullTimestamp: true, - }) - - if l := cfg.Log.Level; l != "" { - level, err := log.ParseLevel(l) - if err != nil { - log.WithError(err). - Errorf("invalid log level: %q", l) - } - if log.GetLevel() != level { - log.Infof("log level changed to %v", level) - log.SetLevel(level) - } - } -} diff --git a/engine/docker.go b/engine/docker.go deleted file mode 100644 index 7f89ffc..0000000 --- a/engine/docker.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2022 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package engine - -import ( - "context" - - "github.com/docker/docker/client" -) - -type Docker struct { - client client.APIClient - hidePull bool -} - -func New(opts ...Option) (*Docker, error) { - cli, err := client.NewClientWithOpts(client.FromEnv) - if err != nil { - return nil, err - } - - srv := &Docker{ - client: cli, - } - - // Loop through each option - for _, opt := range opts { - // Call the option giving the instantiated - opt.Apply(srv) - } - - return srv, nil -} - -// Ping pings the Docker daemon. -func (e *Docker) Ping(ctx context.Context) error { - _, err := e.client.Ping(ctx) - return err -} diff --git a/engine/engine.go b/engine/engine.go deleted file mode 100644 index 5580416..0000000 --- a/engine/engine.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2022 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package engine - -import ( - "context" - "fmt" - "time" - - log "github.com/sirupsen/logrus" -) - -// Start start docker engine api loop -func Start(ctx context.Context) error { - engine, err := New() - if err != nil { - return err - } - - count := 0 - for { - err := engine.Ping(ctx) - if err == context.Canceled { - break - } - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - if err != nil { - log.WithError(err). - Errorln("cannot ping the docker daemon") - count++ - if count == 5 { - return fmt.Errorf("retry connect to docker daemon failed: %d times", count) - } - time.Sleep(time.Second) - } else { - log.Infoln("successfully ping the docker daemon") - break - } - } - return nil -} diff --git a/engine/options.go b/engine/options.go deleted file mode 100644 index 7c81a6d..0000000 --- a/engine/options.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2022 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package engine - -import "github.com/docker/docker/client" - -// An Option configures a mutex. -type Option interface { - Apply(*Docker) -} - -// OptionFunc is a function that configure a value. -type OptionFunc func(*Docker) - -// Apply calls f(option) -func (f OptionFunc) Apply(docker *Docker) { - f(docker) -} - -// WithClient set custom client -func WithClient(c client.APIClient) Option { - return OptionFunc(func(q *Docker) { - q.client = c - }) -} - -// WithHidePull hide pull event. -func WithHidePull(v bool) Option { - return OptionFunc(func(q *Docker) { - q.hidePull = v - }) -} diff --git a/go.mod b/go.mod index 23f10bb..ff2e49d 100644 --- a/go.mod +++ b/go.mod @@ -15,10 +15,12 @@ require ( github.com/nektos/act v0.0.0 github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.6.1 - golang.org/x/sync v0.1.0 + github.com/stretchr/testify v1.8.1 golang.org/x/term v0.6.0 + golang.org/x/time v0.1.0 google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v3 v3.0.1 + gotest.tools/v3 v3.4.0 modernc.org/sqlite v1.14.2 xorm.io/builder v0.3.11-0.20220531020008-1bd24a7dc978 xorm.io/xorm v1.3.2 @@ -33,6 +35,7 @@ require ( github.com/ajg/form v1.5.1 // indirect github.com/containerd/containerd v1.6.18 // indirect github.com/creack/pty v1.1.18 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/cli v23.0.1+incompatible // indirect github.com/docker/distribution v2.8.1+incompatible // indirect github.com/docker/docker-credential-helpers v0.7.0 // indirect @@ -46,6 +49,7 @@ require ( github.com/goccy/go-json v0.8.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/go-cmp v0.5.9 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/uuid v1.3.0 // indirect github.com/imdario/mergo v0.3.13 // indirect @@ -72,6 +76,7 @@ require ( github.com/opencontainers/runc v1.1.3 // indirect github.com/opencontainers/selinux v1.11.0 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/rhysd/actionlint v1.6.23 // indirect github.com/rivo/uniseg v0.4.3 // indirect @@ -86,6 +91,7 @@ require ( golang.org/x/crypto v0.2.0 // indirect golang.org/x/mod v0.4.2 // indirect golang.org/x/net v0.7.0 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.6.0 // indirect golang.org/x/tools v0.1.5 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect diff --git a/go.sum b/go.sum index 2205b74..9d62e0a 100644 --- a/go.sum +++ b/go.sum @@ -193,6 +193,7 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= @@ -655,6 +656,7 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201126233918-771906719818/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -684,6 +686,7 @@ golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA= +golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -704,6 +707,7 @@ golang.org/x/tools v0.0.0-20200325010219-a49f79bcc224/go.mod h1:Sl4aGygMT6LrqrWc golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -770,6 +774,7 @@ gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.4.0 h1:ZazjZUfuVeZGLAmlKKuyv3IKP5orXcwtOwDQH6YVr6o= +gotest.tools/v3 v3.4.0/go.mod h1:CtbdzLSsqVhDgMtKsx03ird5YTGB3ar27v0u/yKBW5g= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/app/artifactcache/doc.go b/internal/app/artifactcache/doc.go new file mode 100644 index 0000000..a9c6a47 --- /dev/null +++ b/internal/app/artifactcache/doc.go @@ -0,0 +1,12 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// Package artifactcache provides a cache handler for the runner. +// +// Inspired by https://github.com/sp-ricard-valverde/github-act-cache-server +// +// TODO: Authorization +// TODO: Restrictions for accessing a cache, see https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#restrictions-for-accessing-a-cache +// TODO: Force deleting cache entries, see https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#force-deleting-cache-entries + +package artifactcache diff --git a/artifactcache/handler.go b/internal/app/artifactcache/handler.go similarity index 99% rename from artifactcache/handler.go rename to internal/app/artifactcache/handler.go index 86e7aa5..2e063fc 100644 --- a/artifactcache/handler.go +++ b/internal/app/artifactcache/handler.go @@ -42,7 +42,7 @@ type Handler struct { outboundIP string } -func NewHandler(dir, outboundIP string, port uint16) (*Handler, error) { +func StartHandler(dir, outboundIP string, port uint16) (*Handler, error) { h := &Handler{} if dir == "" { diff --git a/artifactcache/model.go b/internal/app/artifactcache/model.go similarity index 100% rename from artifactcache/model.go rename to internal/app/artifactcache/model.go diff --git a/artifactcache/storage.go b/internal/app/artifactcache/storage.go similarity index 100% rename from artifactcache/storage.go rename to internal/app/artifactcache/storage.go diff --git a/artifactcache/util.go b/internal/app/artifactcache/util.go similarity index 100% rename from artifactcache/util.go rename to internal/app/artifactcache/util.go diff --git a/cmd/cmd.go b/internal/app/cmd/cmd.go similarity index 94% rename from cmd/cmd.go rename to internal/app/cmd/cmd.go index 375fc32..f6d2bda 100644 --- a/cmd/cmd.go +++ b/internal/app/cmd/cmd.go @@ -10,19 +10,17 @@ import ( "github.com/spf13/cobra" - "gitea.com/gitea/act_runner/config" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/ver" ) -// the version of act_runner -var version = "develop" - func Execute(ctx context.Context) { // ./act_runner rootCmd := &cobra.Command{ Use: "act_runner [event name to run]\nIf no event name passed, will default to \"on: push\"", Short: "Run GitHub actions locally by specifying the event name (e.g. `push`) or an action name directly.", Args: cobra.MaximumNArgs(1), - Version: version, + Version: ver.Version(), SilenceUsage: true, } configFile := "" diff --git a/internal/app/cmd/daemon.go b/internal/app/cmd/daemon.go new file mode 100644 index 0000000..a648d64 --- /dev/null +++ b/internal/app/cmd/daemon.go @@ -0,0 +1,98 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/mattn/go-isatty" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "gitea.com/gitea/act_runner/internal/app/poll" + "gitea.com/gitea/act_runner/internal/app/run" + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/envcheck" + "gitea.com/gitea/act_runner/internal/pkg/labels" + "gitea.com/gitea/act_runner/internal/pkg/ver" +) + +func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + log.Infoln("Starting runner daemon") + + cfg, err := config.LoadDefault(*configFile) + if err != nil { + return fmt.Errorf("invalid configuration: %w", err) + } + + initLogging(cfg) + + reg, err := config.LoadRegistration(cfg.Runner.File) + if os.IsNotExist(err) { + log.Error("registration file not found, please register the runner first") + return err + } else if err != nil { + return fmt.Errorf("failed to load registration file: %w", err) + } + + ls := labels.Labels{} + for _, l := range reg.Labels { + label, err := labels.Parse(l) + if err != nil { + log.WithError(err).Warnf("ignored invalid label %q", l) + continue + } + ls = append(ls, label) + } + if len(ls) == 0 { + log.Warn("no labels configured, runner may not be able to pick up jobs") + } + + if ls.RequireDocker() { + if err := envcheck.CheckIfDockerRunning(ctx); err != nil { + return err + } + } + + cli := client.New( + reg.Address, + cfg.Runner.Insecure, + reg.UUID, + reg.Token, + ver.Version(), + ) + + runner := run.NewRunner(cfg, reg, cli) + poller := poll.New(cfg, cli, runner) + + poller.Poll(ctx) + + return nil + } +} + +// initLogging setup the global logrus logger. +func initLogging(cfg *config.Config) { + isTerm := isatty.IsTerminal(os.Stdout.Fd()) + log.SetFormatter(&log.TextFormatter{ + DisableColors: !isTerm, + FullTimestamp: true, + }) + + if l := cfg.Log.Level; l != "" { + level, err := log.ParseLevel(l) + if err != nil { + log.WithError(err). + Errorf("invalid log level: %q", l) + } + if log.GetLevel() != level { + log.Infof("log level changed to %v", level) + log.SetLevel(level) + } + } +} diff --git a/cmd/exec.go b/internal/app/cmd/exec.go similarity index 99% rename from cmd/exec.go rename to internal/app/cmd/exec.go index a3b5b00..6cb87de 100644 --- a/cmd/exec.go +++ b/internal/app/cmd/exec.go @@ -22,7 +22,7 @@ import ( "github.com/spf13/cobra" "golang.org/x/term" - "gitea.com/gitea/act_runner/artifactcache" + "gitea.com/gitea/act_runner/internal/app/artifactcache" ) type executeArgs struct { @@ -348,7 +348,7 @@ func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command } // init a cache server - handler, err := artifactcache.NewHandler("", "", 0) + handler, err := artifactcache.StartHandler("", "", 0) if err != nil { return err } diff --git a/cmd/register.go b/internal/app/cmd/register.go similarity index 94% rename from cmd/register.go rename to internal/app/cmd/register.go index ef06a31..51318e8 100644 --- a/cmd/register.go +++ b/internal/app/cmd/register.go @@ -20,9 +20,10 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "gitea.com/gitea/act_runner/client" - "gitea.com/gitea/act_runner/config" - "gitea.com/gitea/act_runner/runtime" + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/labels" + "gitea.com/gitea/act_runner/internal/pkg/ver" ) // runRegister registers a runner to the server @@ -37,7 +38,7 @@ func runRegister(ctx context.Context, regArgs *registerArgs, configFile *string) log.SetLevel(log.DebugLevel) log.Infof("Registering runner, arch=%s, os=%s, version=%s.", - goruntime.GOARCH, goruntime.GOOS, version) + goruntime.GOARCH, goruntime.GOOS, ver.Version()) // runner always needs root permission if os.Getuid() != 0 { @@ -116,9 +117,9 @@ func (r *registerInputs) validate() error { return nil } -func validateLabels(labels []string) error { - for _, label := range labels { - if _, _, _, err := runtime.ParseLabel(label); err != nil { +func validateLabels(ls []string) error { + for _, label := range ls { + if _, err := labels.Parse(label); err != nil { return err } } @@ -272,7 +273,7 @@ func doRegister(cfg *config.Config, inputs *registerInputs) error { cfg.Runner.Insecure, "", "", - version, + ver.Version(), ) for { @@ -305,16 +306,16 @@ func doRegister(cfg *config.Config, inputs *registerInputs) error { Labels: inputs.CustomLabels, } - labels := make([]string, len(reg.Labels)) + ls := make([]string, len(reg.Labels)) for i, v := range reg.Labels { - l, _, _, _ := runtime.ParseLabel(v) - labels[i] = l + l, _ := labels.Parse(v) + ls[i] = l.Name } // register new runner. resp, err := cli.Register(ctx, connect.NewRequest(&runnerv1.RegisterRequest{ Name: reg.Name, Token: reg.Token, - AgentLabels: labels, + AgentLabels: ls, })) if err != nil { log.WithError(err).Error("poller: cannot register new runner") diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go new file mode 100644 index 0000000..29c4e33 --- /dev/null +++ b/internal/app/poll/poller.go @@ -0,0 +1,82 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package poll + +import ( + "context" + "errors" + "sync" + "time" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/bufbuild/connect-go" + log "github.com/sirupsen/logrus" + "golang.org/x/time/rate" + + "gitea.com/gitea/act_runner/internal/app/run" + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" +) + +type Poller struct { + client client.Client + runner *run.Runner + capacity int +} + +func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { + return &Poller{ + client: client, + runner: runner, + capacity: cfg.Runner.Capacity, + } +} + +func (p *Poller) Poll(ctx context.Context) { + limiter := rate.NewLimiter(rate.Every(2*time.Second), 1) + wg := &sync.WaitGroup{} + for i := 0; i < p.capacity; i++ { + wg.Add(1) + go p.poll(ctx, wg, limiter) + } + wg.Wait() +} + +func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) { + defer wg.Done() + for { + if err := limiter.Wait(ctx); err != nil { + if ctx.Err() != nil { + log.WithError(err).Debug("limiter wait failed") + } + return + } + task, ok := p.fetchTask(ctx) + if !ok { + continue + } + if err := p.runner.Run(ctx, task); err != nil { + log.WithError(err).Error("failed to run task") + } + } +} + +func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) { + reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{})) + if errors.Is(err, context.DeadlineExceeded) { + err = nil + } + if err != nil { + log.WithError(err).Error("failed to fetch task") + return nil, false + } + + if resp.Msg.Task == nil { + return nil, false + } + return resp.Msg.Task, true +} diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go new file mode 100644 index 0000000..6e31373 --- /dev/null +++ b/internal/app/run/runner.go @@ -0,0 +1,199 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package run + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "path/filepath" + "sync" + "time" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + "github.com/nektos/act/pkg/common" + "github.com/nektos/act/pkg/model" + "github.com/nektos/act/pkg/runner" + log "github.com/sirupsen/logrus" + + "gitea.com/gitea/act_runner/internal/app/artifactcache" + "gitea.com/gitea/act_runner/internal/pkg/client" + "gitea.com/gitea/act_runner/internal/pkg/config" + "gitea.com/gitea/act_runner/internal/pkg/labels" + "gitea.com/gitea/act_runner/internal/pkg/report" + "gitea.com/gitea/act_runner/internal/pkg/ver" +) + +// Runner runs the pipeline. +type Runner struct { + name string + + cfg *config.Config + + client client.Client + labels labels.Labels + envs map[string]string + + runningTasks sync.Map +} + +func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client) *Runner { + ls := labels.Labels{} + for _, v := range reg.Labels { + if l, err := labels.Parse(v); err == nil { + ls = append(ls, l) + } + } + envs := make(map[string]string, len(cfg.Runner.Envs)) + for k, v := range cfg.Runner.Envs { + envs[k] = v + } + if cfg.Cache.Enabled == nil || *cfg.Cache.Enabled { + cacheHandler, err := artifactcache.StartHandler(cfg.Cache.Dir, cfg.Cache.Host, cfg.Cache.Port) + if err != nil { + log.Errorf("cannot init cache server, it will be disabled: %v", err) + // go on + } else { + envs["ACTIONS_CACHE_URL"] = cacheHandler.ExternalURL() + "/" + } + } + + return &Runner{ + name: reg.Name, + cfg: cfg, + client: cli, + labels: ls, + envs: envs, + } +} + +func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error { + if _, ok := r.runningTasks.Load(task.Id); ok { + return fmt.Errorf("task %d is already running", task.Id) + } else { + r.runningTasks.Store(task.Id, struct{}{}) + defer r.runningTasks.Delete(task.Id) + } + + ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout) + defer cancel() + reporter := report.NewReporter(ctx, cancel, r.client, task) + var runErr error + defer func() { + lastWords := "" + if runErr != nil { + lastWords = runErr.Error() + } + _ = reporter.Close(lastWords) + }() + reporter.RunDaemon() + runErr = r.run(ctx, task, reporter) + + return nil +} + +func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.Reporter) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue()) + + workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload)) + if err != nil { + return err + } + + jobIDs := workflow.GetJobIDs() + if len(jobIDs) != 1 { + return fmt.Errorf("multiple jobs found: %v", jobIDs) + } + jobID := jobIDs[0] + plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID) + if err != nil { + return err + } + job := workflow.GetJob(jobID) + reporter.ResetSteps(len(job.Steps)) + + taskContext := task.Context.Fields + + log.Infof("task %v repo is %v %v %v", task.Id, taskContext["repository"].GetStringValue(), + taskContext["gitea_default_actions_url"].GetStringValue(), + r.client.Address()) + + preset := &model.GithubContext{ + Event: taskContext["event"].GetStructValue().AsMap(), + RunID: taskContext["run_id"].GetStringValue(), + RunNumber: taskContext["run_number"].GetStringValue(), + Actor: taskContext["actor"].GetStringValue(), + Repository: taskContext["repository"].GetStringValue(), + EventName: taskContext["event_name"].GetStringValue(), + Sha: taskContext["sha"].GetStringValue(), + Ref: taskContext["ref"].GetStringValue(), + RefName: taskContext["ref_name"].GetStringValue(), + RefType: taskContext["ref_type"].GetStringValue(), + HeadRef: taskContext["head_ref"].GetStringValue(), + BaseRef: taskContext["base_ref"].GetStringValue(), + Token: taskContext["token"].GetStringValue(), + RepositoryOwner: taskContext["repository_owner"].GetStringValue(), + RetentionDays: taskContext["retention_days"].GetStringValue(), + } + if t := task.Secrets["GITEA_TOKEN"]; t != "" { + preset.Token = t + } else if t := task.Secrets["GITHUB_TOKEN"]; t != "" { + preset.Token = t + } + + eventJSON, err := json.Marshal(preset.Event) + if err != nil { + return err + } + + maxLifetime := 3 * time.Hour + if deadline, ok := ctx.Deadline(); ok { + maxLifetime = time.Until(deadline) + } + + runnerConfig := &runner.Config{ + // On Linux, Workdir will be like "//" + // On Windows, Workdir will be like "\\" + Workdir: filepath.FromSlash(string(filepath.Separator) + preset.Repository), + BindWorkdir: false, + + ReuseContainers: false, + ForcePull: false, + ForceRebuild: false, + LogOutput: true, + JSONLogger: false, + Env: r.envs, + Secrets: task.Secrets, + GitHubInstance: r.client.Address(), + AutoRemove: true, + NoSkipCheckout: true, + PresetGitHubContext: preset, + EventJSON: string(eventJSON), + ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%d", task.Id), + ContainerMaxLifetime: maxLifetime, + ContainerNetworkMode: r.cfg.Container.NetworkMode, + DefaultActionInstance: taskContext["gitea_default_actions_url"].GetStringValue(), + PlatformPicker: r.labels.PickPlatform, + } + + rr, err := runner.New(runnerConfig) + if err != nil { + return err + } + executor := rr.NewPlanExecutor(plan) + + reporter.Logf("workflow prepared") + + // add logger recorders + ctx = common.WithLoggerHook(ctx, reporter) + + return executor(ctx) +} diff --git a/client/client.go b/internal/pkg/client/client.go similarity index 100% rename from client/client.go rename to internal/pkg/client/client.go diff --git a/client/header.go b/internal/pkg/client/header.go similarity index 100% rename from client/header.go rename to internal/pkg/client/header.go diff --git a/client/http.go b/internal/pkg/client/http.go similarity index 89% rename from client/http.go rename to internal/pkg/client/http.go index fc374f4..cc0c44e 100644 --- a/client/http.go +++ b/internal/pkg/client/http.go @@ -28,7 +28,7 @@ func getHttpClient(endpoint string, insecure bool) *http.Client { } // New returns a new runner client. -func New(endpoint string, insecure bool, uuid, token, runnerVersion string, opts ...connect.ClientOption) *HTTPClient { +func New(endpoint string, insecure bool, uuid, token, version string, opts ...connect.ClientOption) *HTTPClient { baseURL := strings.TrimRight(endpoint, "/") + "/api/actions" opts = append(opts, connect.WithInterceptors(connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { @@ -39,8 +39,8 @@ func New(endpoint string, insecure bool, uuid, token, runnerVersion string, opts if token != "" { req.Header().Set(TokenHeader, token) } - if runnerVersion != "" { - req.Header().Set(VersionHeader, runnerVersion) + if version != "" { + req.Header().Set(VersionHeader, version) } return next(ctx, req) } diff --git a/config/config.example.yaml b/internal/pkg/config/config.example.yaml similarity index 91% rename from config/config.example.yaml rename to internal/pkg/config/config.example.yaml index 1f05f68..871b346 100644 --- a/config/config.example.yaml +++ b/internal/pkg/config/config.example.yaml @@ -38,5 +38,5 @@ cache: port: 0 container: - # Which network to use for the job containers. - network: bridge + # Which network to use for the job containers. Could be bridge, host, none, or the name of a custom network. + network_mode: bridge diff --git a/config/config.go b/internal/pkg/config/config.go similarity index 94% rename from config/config.go rename to internal/pkg/config/config.go index 34f181c..6280f47 100644 --- a/config/config.go +++ b/internal/pkg/config/config.go @@ -32,7 +32,7 @@ type Config struct { Port uint16 `yaml:"port"` } `yaml:"cache"` Container struct { - Network string `yaml:"network"` + NetworkMode string `yaml:"network_mode"` } } @@ -87,8 +87,8 @@ func LoadDefault(file string) (*Config, error) { cfg.Cache.Dir = filepath.Join(home, ".cache", "actcache") } } - if cfg.Container.Network == "" { - cfg.Container.Network = "bridge" + if cfg.Container.NetworkMode == "" { + cfg.Container.NetworkMode = "bridge" } return cfg, nil diff --git a/config/deprecated.go b/internal/pkg/config/deprecated.go similarity index 100% rename from config/deprecated.go rename to internal/pkg/config/deprecated.go diff --git a/config/embed.go b/internal/pkg/config/embed.go similarity index 100% rename from config/embed.go rename to internal/pkg/config/embed.go diff --git a/config/registration.go b/internal/pkg/config/registration.go similarity index 100% rename from config/registration.go rename to internal/pkg/config/registration.go diff --git a/internal/pkg/envcheck/doc.go b/internal/pkg/envcheck/doc.go new file mode 100644 index 0000000..8641a77 --- /dev/null +++ b/internal/pkg/envcheck/doc.go @@ -0,0 +1,5 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// Package envcheck provides a simple way to check if the environment is ready to run jobs. +package envcheck diff --git a/internal/pkg/envcheck/docker.go b/internal/pkg/envcheck/docker.go new file mode 100644 index 0000000..841ca23 --- /dev/null +++ b/internal/pkg/envcheck/docker.go @@ -0,0 +1,27 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package envcheck + +import ( + "context" + "fmt" + + "github.com/docker/docker/client" +) + +func CheckIfDockerRunning(ctx context.Context) error { + // TODO: if runner support configures to use docker, we need config.Config to pass in + cli, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + return err + } + defer cli.Close() + + _, err = cli.Ping(ctx) + if err != nil { + return fmt.Errorf("cannot ping the docker daemon, does it running? %w", err) + } + + return nil +} diff --git a/internal/pkg/labels/labels.go b/internal/pkg/labels/labels.go new file mode 100644 index 0000000..0848222 --- /dev/null +++ b/internal/pkg/labels/labels.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package labels + +import ( + "fmt" + "strings" +) + +const ( + SchemeHost = "host" + SchemeDocker = "docker" +) + +type Label struct { + Name string + Schema string + Arg string +} + +func Parse(str string) (*Label, error) { + splits := strings.SplitN(str, ":", 3) + label := &Label{ + Name: splits[0], + Schema: "host", + Arg: "", + } + if len(splits) >= 2 { + label.Schema = splits[1] + } + if len(splits) >= 3 { + label.Arg = splits[2] + } + if label.Schema != SchemeHost && label.Schema != SchemeDocker { + return nil, fmt.Errorf("unsupported schema: %s", label.Schema) + } + return label, nil +} + +type Labels []*Label + +func (l Labels) RequireDocker() bool { + for _, label := range l { + if label.Schema == SchemeDocker { + return true + } + } + return false +} + +func (l Labels) PickPlatform(runsOn []string) string { + platforms := make(map[string]string, len(l)) + for _, label := range l { + switch label.Schema { + case SchemeDocker: + // "//" will be ignored + // TODO maybe we should use 'ubuntu-18.04:docker:node:16-buster' instead + platforms[label.Name] = strings.TrimPrefix(label.Arg, "//") + case SchemeHost: + platforms[label.Name] = "-self-hosted" + default: + // It should not happen, because Parse has checked it. + continue + } + } + for _, v := range runsOn { + if v, ok := platforms[v]; ok { + return v + } + } + + // TODO: support multiple labels + // like: + // ["ubuntu-22.04"] => "ubuntu:22.04" + // ["with-gpu"] => "linux:with-gpu" + // ["ubuntu-22.04", "with-gpu"] => "ubuntu:22.04_with-gpu" + + // return default. + // So the runner receives a task with a label that the runner doesn't have, + // it happens when the user have edited the label of the runner in the web UI. + // TODO: it may be not correct, what if the runner is used as host mode only? + return "node:16-bullseye" +} diff --git a/internal/pkg/labels/labels_test.go b/internal/pkg/labels/labels_test.go new file mode 100644 index 0000000..dbef1ef --- /dev/null +++ b/internal/pkg/labels/labels_test.go @@ -0,0 +1,64 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package labels + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gotest.tools/v3/assert" +) + +func TestParse(t *testing.T) { + tests := []struct { + args string + want *Label + wantErr bool + }{ + { + args: "ubuntu:docker://node:18", + want: &Label{ + Name: "ubuntu", + Schema: "docker", + Arg: "//node:18", + }, + wantErr: false, + }, + { + args: "ubuntu:host", + want: &Label{ + Name: "ubuntu", + Schema: "host", + Arg: "", + }, + wantErr: false, + }, + { + args: "ubuntu", + want: &Label{ + Name: "ubuntu", + Schema: "host", + Arg: "", + }, + wantErr: false, + }, + { + args: "ubuntu:vm:ubuntu-18.04", + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.args, func(t *testing.T) { + got, err := Parse(tt.args) + if tt.wantErr { + require.Error(t, err) + return + } else { + require.NoError(t, err) + } + assert.DeepEqual(t, got, tt.want) + }) + } +} diff --git a/runtime/reporter.go b/internal/pkg/report/reporter.go similarity index 98% rename from runtime/reporter.go rename to internal/pkg/report/reporter.go index e629435..e6b635a 100644 --- a/runtime/reporter.go +++ b/internal/pkg/report/reporter.go @@ -1,7 +1,7 @@ // Copyright 2022 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT -package runtime +package report import ( "context" @@ -11,13 +11,13 @@ import ( "time" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" - "gitea.com/gitea/act_runner/client" - retry "github.com/avast/retry-go/v4" "github.com/bufbuild/connect-go" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" + + "gitea.com/gitea/act_runner/internal/pkg/client" ) type Reporter struct { @@ -179,6 +179,7 @@ func (r *Reporter) Close(lastWords string) error { v.Result = runnerv1.Result_RESULT_CANCELLED } } + r.state.Result = runnerv1.Result_RESULT_FAILURE r.logRows = append(r.logRows, &runnerv1.LogRow{ Time: timestamppb.Now(), Content: lastWords, diff --git a/internal/pkg/ver/version.go b/internal/pkg/ver/version.go new file mode 100644 index 0000000..3c07a18 --- /dev/null +++ b/internal/pkg/ver/version.go @@ -0,0 +1,11 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package ver + +// go build -ldflags "-X gitea.com/gitea/act_runner/internal/pkg/ver.version=1.2.3" +var version = "dev" + +func Version() string { + return version +} diff --git a/main.go b/main.go index dded5dd..4adbd13 100644 --- a/main.go +++ b/main.go @@ -5,33 +5,15 @@ package main import ( "context" - "os" "os/signal" "syscall" - "gitea.com/gitea/act_runner/cmd" + "gitea.com/gitea/act_runner/internal/app/cmd" ) -func withContextFunc(ctx context.Context, f func()) context.Context { - ctx, cancel := context.WithCancel(ctx) - go func() { - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - defer signal.Stop(c) - - select { - case <-ctx.Done(): - case <-c: - cancel() - f() - } - }() - - return ctx -} - func main() { - ctx := withContextFunc(context.Background(), func() {}) + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() // run the command cmd.Execute(ctx) } diff --git a/poller/metric.go b/poller/metric.go deleted file mode 100644 index 3731b79..0000000 --- a/poller/metric.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2022 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package poller - -import "sync/atomic" - -// Metric interface -type Metric interface { - IncBusyWorker() int64 - DecBusyWorker() int64 - BusyWorkers() int64 -} - -var _ Metric = (*metric)(nil) - -type metric struct { - busyWorkers int64 -} - -// NewMetric for default metric structure -func NewMetric() Metric { - return &metric{} -} - -func (m *metric) IncBusyWorker() int64 { - return atomic.AddInt64(&m.busyWorkers, 1) -} - -func (m *metric) DecBusyWorker() int64 { - return atomic.AddInt64(&m.busyWorkers, -1) -} - -func (m *metric) BusyWorkers() int64 { - return atomic.LoadInt64(&m.busyWorkers) -} diff --git a/poller/poller.go b/poller/poller.go deleted file mode 100644 index 228f6e2..0000000 --- a/poller/poller.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright 2022 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package poller - -import ( - "context" - "errors" - "sync" - "time" - - runnerv1 "code.gitea.io/actions-proto-go/runner/v1" - "github.com/bufbuild/connect-go" - log "github.com/sirupsen/logrus" - - "gitea.com/gitea/act_runner/client" - "gitea.com/gitea/act_runner/config" -) - -var ErrDataLock = errors.New("Data Lock Error") - -func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error, cfg *config.Config) *Poller { - return &Poller{ - Client: cli, - Dispatch: dispatch, - routineGroup: newRoutineGroup(), - metric: &metric{}, - ready: make(chan struct{}, 1), - cfg: cfg, - } -} - -type Poller struct { - Client client.Client - Dispatch func(context.Context, *runnerv1.Task) error - - sync.Mutex - routineGroup *routineGroup - metric *metric - ready chan struct{} - cfg *config.Config -} - -func (p *Poller) schedule() { - p.Lock() - defer p.Unlock() - if int(p.metric.BusyWorkers()) >= p.cfg.Runner.Capacity { - return - } - - select { - case p.ready <- struct{}{}: - default: - } -} - -func (p *Poller) Wait() { - p.routineGroup.Wait() -} - -func (p *Poller) handle(ctx context.Context, l *log.Entry) { - defer func() { - if r := recover(); r != nil { - l.Errorf("handle task panic: %+v", r) - } - }() - - for { - select { - case <-ctx.Done(): - return - default: - task, err := p.pollTask(ctx) - if task == nil || err != nil { - if err != nil { - l.Errorf("can't find the task: %v", err.Error()) - } - time.Sleep(5 * time.Second) - break - } - - p.metric.IncBusyWorker() - p.routineGroup.Run(func() { - defer p.schedule() - defer p.metric.DecBusyWorker() - if err := p.dispatchTask(ctx, task); err != nil { - l.Errorf("execute task: %v", err.Error()) - } - }) - return - } - } -} - -func (p *Poller) Poll(ctx context.Context) error { - l := log.WithField("func", "Poll") - - for { - // check worker number - p.schedule() - - select { - // wait worker ready - case <-p.ready: - case <-ctx.Done(): - return nil - } - p.handle(ctx, l) - } -} - -func (p *Poller) pollTask(ctx context.Context) (*runnerv1.Task, error) { - l := log.WithField("func", "pollTask") - l.Info("poller: request stage from remote server") - - reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - // request a new build stage for execution from the central - // build server. - resp, err := p.Client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{})) - if err == context.Canceled || err == context.DeadlineExceeded { - l.WithError(err).Trace("poller: no stage returned") - return nil, nil - } - - if err != nil && err == ErrDataLock { - l.WithError(err).Info("task accepted by another runner") - return nil, nil - } - - if err != nil { - l.WithError(err).Error("cannot accept task") - return nil, err - } - - // exit if a nil or empty stage is returned from the system - // and allow the runner to retry. - if resp.Msg.Task == nil || resp.Msg.Task.Id == 0 { - return nil, nil - } - - return resp.Msg.Task, nil -} - -func (p *Poller) dispatchTask(ctx context.Context, task *runnerv1.Task) error { - l := log.WithField("func", "dispatchTask") - defer func() { - e := recover() - if e != nil { - l.Errorf("panic error: %v", e) - } - }() - - runCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.Timeout) - defer cancel() - - return p.Dispatch(runCtx, task) -} diff --git a/poller/thread.go b/poller/thread.go deleted file mode 100644 index 7fa6af1..0000000 --- a/poller/thread.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2022 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package poller - -import "sync" - -type routineGroup struct { - waitGroup sync.WaitGroup -} - -func newRoutineGroup() *routineGroup { - return new(routineGroup) -} - -func (g *routineGroup) Run(fn func()) { - g.waitGroup.Add(1) - - go func() { - defer g.waitGroup.Done() - fn() - }() -} - -func (g *routineGroup) Wait() { - g.waitGroup.Wait() -} diff --git a/runtime/label.go b/runtime/label.go deleted file mode 100644 index c7aa001..0000000 --- a/runtime/label.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2023 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package runtime - -import ( - "fmt" - "strings" -) - -func ParseLabel(str string) (label, schema, arg string, err error) { - splits := strings.SplitN(str, ":", 3) - label = splits[0] - schema = "host" - arg = "" - if len(splits) >= 2 { - schema = splits[1] - } - if len(splits) >= 3 { - arg = splits[2] - } - if schema != "host" && schema != "docker" { - return "", "", "", fmt.Errorf("unsupported schema: %s", schema) - } - return -} diff --git a/runtime/label_test.go b/runtime/label_test.go deleted file mode 100644 index f17c372..0000000 --- a/runtime/label_test.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2023 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package runtime - -import "testing" - -func TestParseLabel(t *testing.T) { - tests := []struct { - args string - wantLabel string - wantSchema string - wantArg string - wantErr bool - }{ - { - args: "ubuntu:docker://node:18", - wantLabel: "ubuntu", - wantSchema: "docker", - wantArg: "//node:18", - wantErr: false, - }, - { - args: "ubuntu:host", - wantLabel: "ubuntu", - wantSchema: "host", - wantArg: "", - wantErr: false, - }, - { - args: "ubuntu", - wantLabel: "ubuntu", - wantSchema: "host", - wantArg: "", - wantErr: false, - }, - { - args: "ubuntu:vm:ubuntu-18.04", - wantLabel: "", - wantSchema: "", - wantArg: "", - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.args, func(t *testing.T) { - gotLabel, gotSchema, gotArg, err := ParseLabel(tt.args) - if (err != nil) != tt.wantErr { - t.Errorf("parseLabel() error = %v, wantErr %v", err, tt.wantErr) - return - } - if gotLabel != tt.wantLabel { - t.Errorf("parseLabel() gotLabel = %v, want %v", gotLabel, tt.wantLabel) - } - if gotSchema != tt.wantSchema { - t.Errorf("parseLabel() gotSchema = %v, want %v", gotSchema, tt.wantSchema) - } - if gotArg != tt.wantArg { - t.Errorf("parseLabel() gotArg = %v, want %v", gotArg, tt.wantArg) - } - }) - } -} diff --git a/runtime/runtime.go b/runtime/runtime.go deleted file mode 100644 index 8e8bf01..0000000 --- a/runtime/runtime.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2022 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package runtime - -import ( - "context" - "strings" - - runnerv1 "code.gitea.io/actions-proto-go/runner/v1" - log "github.com/sirupsen/logrus" - - "gitea.com/gitea/act_runner/artifactcache" - "gitea.com/gitea/act_runner/client" -) - -// Runner runs the pipeline. -type Runner struct { - Machine string - Version string - ForgeInstance string - Environ map[string]string - Client client.Client - Labels []string - Network string - CacheHandler *artifactcache.Handler -} - -// Run runs the pipeline stage. -func (s *Runner) Run(ctx context.Context, task *runnerv1.Task) error { - env := map[string]string{} - for k, v := range s.Environ { - env[k] = v - } - if s.CacheHandler != nil { - env["ACTIONS_CACHE_URL"] = s.CacheHandler.ExternalURL() + "/" - } - return NewTask(task.Id, s.Client, env, s.Network, s.platformPicker).Run(ctx, task, s.Machine, s.Version) -} - -func (s *Runner) platformPicker(labels []string) string { - platforms := make(map[string]string, len(s.Labels)) - for _, l := range s.Labels { - label, schema, arg, err := ParseLabel(l) - if err != nil { - log.Errorf("invaid label %q: %v", l, err) - continue - } - - switch schema { - case "docker": - // TODO "//" will be ignored, maybe we should use 'ubuntu-18.04:docker:node:16-buster' instead - platforms[label] = strings.TrimPrefix(arg, "//") - case "host": - platforms[label] = "-self-hosted" - default: - // It should not happen, because ParseLabel has checked it. - continue - } - } - - for _, label := range labels { - if v, ok := platforms[label]; ok { - return v - } - } - - // TODO: support multiple labels - // like: - // ["ubuntu-22.04"] => "ubuntu:22.04" - // ["with-gpu"] => "linux:with-gpu" - // ["ubuntu-22.04", "with-gpu"] => "ubuntu:22.04_with-gpu" - - // return default. - // So the runner receives a task with a label that the runner doesn't have, - // it happens when the user have edited the label of the runner in the web UI. - return "node:16-bullseye" // TODO: it may be not correct, what if the runner is used as host mode only? -} diff --git a/runtime/task.go b/runtime/task.go deleted file mode 100644 index e11f65d..0000000 --- a/runtime/task.go +++ /dev/null @@ -1,267 +0,0 @@ -// Copyright 2022 The Gitea Authors. All rights reserved. -// SPDX-License-Identifier: MIT - -package runtime - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "os" - "path/filepath" - "sync" - "time" - - runnerv1 "code.gitea.io/actions-proto-go/runner/v1" - "github.com/nektos/act/pkg/common" - "github.com/nektos/act/pkg/model" - "github.com/nektos/act/pkg/runner" - log "github.com/sirupsen/logrus" - - "gitea.com/gitea/act_runner/client" -) - -var globalTaskMap sync.Map - -type TaskInput struct { - repoDirectory string - // actor string - // workdir string - // workflowsPath string - // autodetectEvent bool - // eventPath string - // reuseContainers bool - // bindWorkdir bool - // secrets []string - envs map[string]string - // platforms []string - // dryrun bool - forcePull bool - forceRebuild bool - // noOutput bool - // envfile string - // secretfile string - insecureSecrets bool - // defaultBranch string - privileged bool - usernsMode string - containerArchitecture string - containerDaemonSocket string - // noWorkflowRecurse bool - useGitIgnore bool - containerCapAdd []string - containerCapDrop []string - // autoRemove bool - artifactServerPath string - artifactServerPort string - jsonLogger bool - // noSkipCheckout bool - // remoteName string - - EnvFile string - - containerNetworkMode string -} - -type Task struct { - BuildID int64 - Input *TaskInput - - client client.Client - log *log.Entry - platformPicker func([]string) string -} - -// NewTask creates a new task -func NewTask(buildID int64, client client.Client, runnerEnvs map[string]string, network string, picker func([]string) string) *Task { - task := &Task{ - Input: &TaskInput{ - envs: runnerEnvs, - containerNetworkMode: network, - }, - BuildID: buildID, - - client: client, - log: log.WithField("buildID", buildID), - platformPicker: picker, - } - task.Input.repoDirectory, _ = os.Getwd() - return task -} - -// getWorkflowsPath return the workflows directory, it will try .gitea first and then fallback to .github -func getWorkflowsPath(dir string) (string, error) { - p := filepath.Join(dir, ".gitea/workflows") - _, err := os.Stat(p) - if err != nil { - if !os.IsNotExist(err) { - return "", err - } - return filepath.Join(dir, ".github/workflows"), nil - } - return p, nil -} - -func getToken(task *runnerv1.Task) string { - token := task.Secrets["GITHUB_TOKEN"] - if task.Secrets["GITEA_TOKEN"] != "" { - token = task.Secrets["GITEA_TOKEN"] - } - if task.Context.Fields["token"].GetStringValue() != "" { - token = task.Context.Fields["token"].GetStringValue() - } - return token -} - -func (t *Task) Run(ctx context.Context, task *runnerv1.Task, runnerName, runnerVersion string) (lastErr error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - _, exist := globalTaskMap.Load(task.Id) - if exist { - return fmt.Errorf("task %d already exists", task.Id) - } - - // set task ve to global map - // when task is done or canceled, it will be removed from the map - globalTaskMap.Store(task.Id, t) - defer globalTaskMap.Delete(task.Id) - - lastWords := "" - reporter := NewReporter(ctx, cancel, t.client, task) - defer func() { - // set the job to failed on an error return value - if lastErr != nil { - reporter.Fire(&log.Entry{ - Data: log.Fields{ - "jobResult": "failure", - }, - Time: time.Now(), - }) - } - _ = reporter.Close(lastWords) - }() - reporter.RunDaemon() - - reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", runnerName, runnerVersion, task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue()) - - workflowsPath, err := getWorkflowsPath(t.Input.repoDirectory) - if err != nil { - lastWords = err.Error() - return err - } - t.log.Debugf("workflows path: %s", workflowsPath) - - workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload)) - if err != nil { - lastWords = err.Error() - return err - } - - jobIDs := workflow.GetJobIDs() - if len(jobIDs) != 1 { - err := fmt.Errorf("multiple jobs found: %v", jobIDs) - lastWords = err.Error() - return err - } - jobID := jobIDs[0] - plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID) - if err != nil { - lastWords = err.Error() - return err - } - job := workflow.GetJob(jobID) - reporter.ResetSteps(len(job.Steps)) - - log.Infof("plan: %+v", plan.Stages[0].Runs) - - token := getToken(task) - dataContext := task.Context.Fields - - log.Infof("task %v repo is %v %v %v", task.Id, dataContext["repository"].GetStringValue(), - dataContext["gitea_default_actions_url"].GetStringValue(), - t.client.Address()) - - preset := &model.GithubContext{ - Event: dataContext["event"].GetStructValue().AsMap(), - RunID: dataContext["run_id"].GetStringValue(), - RunNumber: dataContext["run_number"].GetStringValue(), - Actor: dataContext["actor"].GetStringValue(), - Repository: dataContext["repository"].GetStringValue(), - EventName: dataContext["event_name"].GetStringValue(), - Sha: dataContext["sha"].GetStringValue(), - Ref: dataContext["ref"].GetStringValue(), - RefName: dataContext["ref_name"].GetStringValue(), - RefType: dataContext["ref_type"].GetStringValue(), - HeadRef: dataContext["head_ref"].GetStringValue(), - BaseRef: dataContext["base_ref"].GetStringValue(), - Token: token, - RepositoryOwner: dataContext["repository_owner"].GetStringValue(), - RetentionDays: dataContext["retention_days"].GetStringValue(), - } - eventJSON, err := json.Marshal(preset.Event) - if err != nil { - lastWords = err.Error() - return err - } - - maxLifetime := 3 * time.Hour - if deadline, ok := ctx.Deadline(); ok { - maxLifetime = time.Until(deadline) - } - - input := t.Input - config := &runner.Config{ - // On Linux, Workdir will be like "//" - // On Windows, Workdir will be like "\\" - Workdir: filepath.FromSlash(string(filepath.Separator) + preset.Repository), - BindWorkdir: false, - ReuseContainers: false, - ForcePull: input.forcePull, - ForceRebuild: input.forceRebuild, - LogOutput: true, - JSONLogger: input.jsonLogger, - Env: input.envs, - Secrets: task.Secrets, - InsecureSecrets: input.insecureSecrets, - Privileged: input.privileged, - UsernsMode: input.usernsMode, - ContainerArchitecture: input.containerArchitecture, - ContainerDaemonSocket: input.containerDaemonSocket, - UseGitIgnore: input.useGitIgnore, - GitHubInstance: t.client.Address(), - ContainerCapAdd: input.containerCapAdd, - ContainerCapDrop: input.containerCapDrop, - AutoRemove: true, - ArtifactServerPath: input.artifactServerPath, - ArtifactServerPort: input.artifactServerPort, - NoSkipCheckout: true, - PresetGitHubContext: preset, - EventJSON: string(eventJSON), - ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%d", task.Id), - ContainerMaxLifetime: maxLifetime, - ContainerNetworkMode: input.containerNetworkMode, - DefaultActionInstance: dataContext["gitea_default_actions_url"].GetStringValue(), - PlatformPicker: t.platformPicker, - } - r, err := runner.New(config) - if err != nil { - lastWords = err.Error() - return err - } - - executor := r.NewPlanExecutor(plan) - - t.log.Infof("workflow prepared") - reporter.Logf("workflow prepared") - - // add logger recorders - ctx = common.WithLoggerHook(ctx, reporter) - - if err := executor(ctx); err != nil { - lastWords = err.Error() - return err - } - - return nil -}