Skip to content

Commit

Permalink
Merge pull request #263 from gabriel-samfira/add-database-watcher
Browse files Browse the repository at this point in the history
Add database watcher
  • Loading branch information
gabriel-samfira committed Jun 20, 2024
2 parents a66cbcc + 230f002 commit c188a6f
Show file tree
Hide file tree
Showing 27 changed files with 1,900 additions and 110 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ GARM supports creating pools on either GitHub itself or on your own deployment o

Through the use of providers, `GARM` can create runners in a variety of environments using the same `GARM` instance. Whether you want to create pools of runners in your OpenStack cloud, your Azure cloud and your Kubernetes cluster, that is easily achieved by just installing the appropriate providers, configuring them in `GARM` and creating pools that use them. You can create zero-runner pools for instances with high costs (large VMs, GPU enabled instances, etc) and have them spin up on demand, or you can create large pools of k8s backed runners that can be used for your CI/CD pipelines at a moment's notice. You can mix them up and create pools in any combination of providers or resource allocations you want.

:warning: **Important note**: The README and documentation in the `main` branch are relevant to the not yet released code that is present in `main`. Following the documentation from the `main` branch for a stable release of GARM, may lead to errors. To view the documentation for the latest stable release, please switch to the appropriate tag. For information about setting up `v0.1.4`, please refer to the [v0.1.4 tag](https://github.com/cloudbase/garm/tree/v0.1.4)
:warning: **Important note**: The README and documentation in the `main` branch are relevant to the not yet released code that is present in `main`. Following the documentation from the `main` branch for a stable release of GARM, may lead to errors. To view the documentation for the latest stable release, please switch to the appropriate tag. For information about setting up `v0.1.4`, please refer to the [v0.1.4 tag](https://github.com/cloudbase/garm/tree/v0.1.4).

## Join us on slack

Expand Down
3 changes: 3 additions & 0 deletions cmd/garm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cloudbase/garm/config"
"github.com/cloudbase/garm/database"
"github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/database/watcher"
"github.com/cloudbase/garm/metrics"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner" //nolint:typecheck
Expand Down Expand Up @@ -183,6 +184,7 @@ func main() {
}
ctx, stop := signal.NotifyContext(context.Background(), signals...)
defer stop()
watcher.InitWatcher(ctx)

ctx = auth.GetAdminContext(ctx)

Expand Down Expand Up @@ -313,6 +315,7 @@ func main() {
}()

<-ctx.Done()

shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 60*time.Second)
defer shutdownCancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions database/common/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package common

import "fmt"

var (
ErrProducerClosed = fmt.Errorf("producer is closed")
ErrProducerTimeoutErr = fmt.Errorf("producer timeout error")
ErrProducerAlreadyRegistered = fmt.Errorf("producer already registered")
ErrConsumerAlreadyRegistered = fmt.Errorf("consumer already registered")
ErrWatcherAlreadyStarted = fmt.Errorf("watcher already started")
ErrWatcherNotInitialized = fmt.Errorf("watcher not initialized")
)
7 changes: 5 additions & 2 deletions database/common/common.go → database/common/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type JobsStore interface {
DeleteCompletedJobs(ctx context.Context) error
}

type EntityPools interface {
type EntityPoolStore interface {
CreateEntityPool(ctx context.Context, entity params.GithubEntity, param params.CreatePoolParams) (params.Pool, error)
GetEntityPool(ctx context.Context, entity params.GithubEntity, poolID string) (params.Pool, error)
DeleteEntityPool(ctx context.Context, entity params.GithubEntity, poolID string) error
Expand All @@ -144,8 +144,11 @@ type Store interface {
UserStore
InstanceStore
JobsStore
EntityPools
GithubEndpointStore
GithubCredentialsStore
ControllerStore
EntityPoolStore

ControllerInfo() (params.ControllerInfo, error)
InitController() (params.ControllerInfo, error)
}
53 changes: 53 additions & 0 deletions database/common/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package common

import "context"

type (
DatabaseEntityType string
OperationType string
PayloadFilterFunc func(ChangePayload) bool
)

const (
RepositoryEntityType DatabaseEntityType = "repository"
OrganizationEntityType DatabaseEntityType = "organization"
EnterpriseEntityType DatabaseEntityType = "enterprise"
PoolEntityType DatabaseEntityType = "pool"
UserEntityType DatabaseEntityType = "user"
InstanceEntityType DatabaseEntityType = "instance"
JobEntityType DatabaseEntityType = "job"
ControllerEntityType DatabaseEntityType = "controller"
GithubCredentialsEntityType DatabaseEntityType = "github_credentials" // #nosec G101
GithubEndpointEntityType DatabaseEntityType = "github_endpoint"
)

const (
CreateOperation OperationType = "create"
UpdateOperation OperationType = "update"
DeleteOperation OperationType = "delete"
)

type ChangePayload struct {
EntityType DatabaseEntityType
Operation OperationType
Payload interface{}
}

type Consumer interface {
Watch() <-chan ChangePayload
IsClosed() bool
Close()
SetFilters(filters ...PayloadFilterFunc)
}

type Producer interface {
Notify(ChangePayload) error
IsClosed() bool
Close()
}

type Watcher interface {
RegisterProducer(ctx context.Context, ID string) (Producer, error)
RegisterConsumer(ctx context.Context, ID string, filters ...PayloadFilterFunc) (Consumer, error)
Close()
}
58 changes: 35 additions & 23 deletions database/sql/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"gorm.io/gorm"

runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
)

Expand Down Expand Up @@ -82,38 +83,49 @@ func (s *sqlDatabase) InitController() (params.ControllerInfo, error) {
}, nil
}

func (s *sqlDatabase) UpdateController(info params.UpdateControllerParams) (params.ControllerInfo, error) {
func (s *sqlDatabase) UpdateController(info params.UpdateControllerParams) (paramInfo params.ControllerInfo, err error) {
defer func() {
if err == nil {
s.sendNotify(common.ControllerEntityType, common.UpdateOperation, paramInfo)
}
}()
var dbInfo ControllerInfo
q := s.conn.Model(&ControllerInfo{}).First(&dbInfo)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return params.ControllerInfo{}, errors.Wrap(runnerErrors.ErrNotFound, "fetching controller info")
err = s.conn.Transaction(func(tx *gorm.DB) error {
q := tx.Model(&ControllerInfo{}).First(&dbInfo)
if q.Error != nil {
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
return errors.Wrap(runnerErrors.ErrNotFound, "fetching controller info")
}
return errors.Wrap(q.Error, "fetching controller info")
}
return params.ControllerInfo{}, errors.Wrap(q.Error, "fetching controller info")
}

if err := info.Validate(); err != nil {
return params.ControllerInfo{}, errors.Wrap(err, "validating controller info")
}
if err := info.Validate(); err != nil {
return errors.Wrap(err, "validating controller info")
}

if info.MetadataURL != nil {
dbInfo.MetadataURL = *info.MetadataURL
}
if info.MetadataURL != nil {
dbInfo.MetadataURL = *info.MetadataURL
}

if info.CallbackURL != nil {
dbInfo.CallbackURL = *info.CallbackURL
}
if info.CallbackURL != nil {
dbInfo.CallbackURL = *info.CallbackURL
}

if info.WebhookURL != nil {
dbInfo.WebhookBaseURL = *info.WebhookURL
}
if info.WebhookURL != nil {
dbInfo.WebhookBaseURL = *info.WebhookURL
}

q = s.conn.Save(&dbInfo)
if q.Error != nil {
return params.ControllerInfo{}, errors.Wrap(q.Error, "saving controller info")
q = tx.Save(&dbInfo)
if q.Error != nil {
return errors.Wrap(q.Error, "saving controller info")
}
return nil
})
if err != nil {
return params.ControllerInfo{}, errors.Wrap(err, "updating controller info")
}

paramInfo, err := dbControllerToCommonController(dbInfo)
paramInfo, err = dbControllerToCommonController(dbInfo)
if err != nil {
return params.ControllerInfo{}, errors.Wrap(err, "converting controller info")
}
Expand Down
38 changes: 31 additions & 7 deletions database/sql/enterprise.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,32 @@ package sql

import (
"context"
"log/slog"

"github.com/google/uuid"
"github.com/pkg/errors"
"gorm.io/gorm"

runnerErrors "github.com/cloudbase/garm-provider-common/errors"
"github.com/cloudbase/garm-provider-common/util"
"github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/params"
)

func (s *sqlDatabase) CreateEnterprise(ctx context.Context, name, credentialsName, webhookSecret string, poolBalancerType params.PoolBalancerType) (params.Enterprise, error) {
func (s *sqlDatabase) CreateEnterprise(ctx context.Context, name, credentialsName, webhookSecret string, poolBalancerType params.PoolBalancerType) (paramEnt params.Enterprise, err error) {
if webhookSecret == "" {
return params.Enterprise{}, errors.New("creating enterprise: missing secret")
}
secret, err := util.Seal([]byte(webhookSecret), []byte(s.cfg.Passphrase))
if err != nil {
return params.Enterprise{}, errors.Wrap(err, "encoding secret")
}

defer func() {
if err == nil {
s.sendNotify(common.EnterpriseEntityType, common.CreateOperation, paramEnt)
}
}()
newEnterprise := Enterprise{
Name: name,
WebhookSecret: secret,
Expand Down Expand Up @@ -66,12 +74,12 @@ func (s *sqlDatabase) CreateEnterprise(ctx context.Context, name, credentialsNam
return params.Enterprise{}, errors.Wrap(err, "creating enterprise")
}

param, err := s.sqlToCommonEnterprise(newEnterprise, true)
paramEnt, err = s.sqlToCommonEnterprise(newEnterprise, true)
if err != nil {
return params.Enterprise{}, errors.Wrap(err, "creating enterprise")
}

return param, nil
return paramEnt, nil
}

func (s *sqlDatabase) GetEnterprise(ctx context.Context, name string) (params.Enterprise, error) {
Expand Down Expand Up @@ -124,11 +132,22 @@ func (s *sqlDatabase) ListEnterprises(_ context.Context) ([]params.Enterprise, e
}

func (s *sqlDatabase) DeleteEnterprise(ctx context.Context, enterpriseID string) error {
enterprise, err := s.getEnterpriseByID(ctx, s.conn, enterpriseID)
enterprise, err := s.getEnterpriseByID(ctx, s.conn, enterpriseID, "Endpoint", "Credentials")
if err != nil {
return errors.Wrap(err, "fetching enterprise")
}

defer func(ent Enterprise) {
if err == nil {
asParams, innerErr := s.sqlToCommonEnterprise(ent, true)
if innerErr == nil {
s.sendNotify(common.EnterpriseEntityType, common.DeleteOperation, asParams)
} else {
slog.With(slog.Any("error", innerErr)).ErrorContext(ctx, "error sending delete notification", "enterprise", enterpriseID)
}
}
}(enterprise)

q := s.conn.Unscoped().Delete(&enterprise)
if q.Error != nil && !errors.Is(q.Error, gorm.ErrRecordNotFound) {
return errors.Wrap(q.Error, "deleting enterprise")
Expand All @@ -137,10 +156,15 @@ func (s *sqlDatabase) DeleteEnterprise(ctx context.Context, enterpriseID string)
return nil
}

func (s *sqlDatabase) UpdateEnterprise(ctx context.Context, enterpriseID string, param params.UpdateEntityParams) (params.Enterprise, error) {
func (s *sqlDatabase) UpdateEnterprise(ctx context.Context, enterpriseID string, param params.UpdateEntityParams) (newParams params.Enterprise, err error) {
defer func() {
if err == nil {
s.sendNotify(common.EnterpriseEntityType, common.UpdateOperation, newParams)
}
}()
var enterprise Enterprise
var creds GithubCredentials
err := s.conn.Transaction(func(tx *gorm.DB) error {
err = s.conn.Transaction(func(tx *gorm.DB) error {
var err error
enterprise, err = s.getEnterpriseByID(ctx, tx, enterpriseID)
if err != nil {
Expand Down Expand Up @@ -196,7 +220,7 @@ func (s *sqlDatabase) UpdateEnterprise(ctx context.Context, enterpriseID string,
if err != nil {
return params.Enterprise{}, errors.Wrap(err, "updating enterprise")
}
newParams, err := s.sqlToCommonEnterprise(enterprise, true)
newParams, err = s.sqlToCommonEnterprise(enterprise, true)
if err != nil {
return params.Enterprise{}, errors.Wrap(err, "updating enterprise")
}
Expand Down
Loading

0 comments on commit c188a6f

Please sign in to comment.