Skip to content

Commit

Permalink
Fully working
Browse files Browse the repository at this point in the history
  • Loading branch information
boxbeam committed May 16, 2024
1 parent ab414fa commit 0ec5736
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ee/tabby-db-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub fn query_paged_as(input: TokenStream) -> TokenStream {
let backwards = input.backwards;
quote! {
sqlx::query_as(&crate::make_pagination_query_with_condition({
let _ = sqlx::query_as!(#typ, "SELECT " + #columns + " FROM " + #table_name);
let _ = sqlx::query_as!(#typ, "SELECT " + #columns + " FROM (SELECT * FROM " + #table_name + ")");
&#table_name
}, &[ #(#column_args),* ], #limit, #skip_id, #backwards, #condition))
}
Expand Down
16 changes: 9 additions & 7 deletions ee/tabby-db/src/provided_repositories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl DbConn {

pub async fn list_provided_repositories(
&self,
provider_ids: Vec<i64>,
integration_ids: Vec<i64>,
kind: Option<String>,
active: Option<bool>,
limit: Option<usize>,
Expand All @@ -70,13 +70,15 @@ impl DbConn {
) -> Result<Vec<ProvidedRepositoryDAO>> {
let mut conditions = vec![];

let provider_ids = provider_ids
let integration_ids = integration_ids
.into_iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(", ");
if !provider_ids.is_empty() {
conditions.push(format!("access_token_provider_id IN ({provider_ids})"));
if !integration_ids.is_empty() {
conditions.push(format!(
"integration_access_token_id IN ({integration_ids})"
));
}

let active_filter = active.map(|active| format!("active = {active}"));
Expand All @@ -91,14 +93,14 @@ impl DbConn {
ProvidedRepositoryDAO,
"provided_repositories JOIN integration_access_tokens ON integration_access_token_id = integration_access_tokens.id",
[
"provided_repositories.id" as "id",
"id",
"vendor_id",
"name",
"git_url",
"active",
"integration_access_token_id",
"provided_repositories.created_at" as "created_at: DateTimeUtc",
"provided_repositories.updated_at" as "updated_at: DateTimeUtc"
"created_at",
"updated_at"
],
limit,
skip_id,
Expand Down
3 changes: 2 additions & 1 deletion ee/tabby-schema/src/schema/repository/third_party.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub trait ThirdPartyRepositoryService: Send + Sync {
first: Option<usize>,
last: Option<usize>,
) -> Result<Vec<ProvidedRepository>>;
async fn get_repository(&self, id: ID) -> Result<ProvidedRepository>;

async fn update_repository_active(&self, id: ID, active: bool) -> Result<()>;
async fn upsert_repository(
Expand All @@ -55,7 +56,7 @@ pub trait ThirdPartyRepositoryService: Send + Sync {
git_url: String,
) -> Result<()>;
async fn list_active_git_urls(&self) -> Result<Vec<String>>;
async fn sync_repositories(&self, kind: IntegrationKind) -> Result<()>;
async fn sync_repositories(&self, integration_id: ID) -> Result<()>;
async fn delete_outdated_repositories(
&self,
integration_id: ID,
Expand Down
1 change: 1 addition & 0 deletions ee/tabby-webserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ gitlab = "0.1610.0"
apalis = { git = "https://github.com/wsxiaoys/apalis", rev = "91526e8", features = ["sqlite", "cron" ] }
uuid.workspace = true
hyper-util = { version = "0.1.3", features = ["client-legacy"] }
strum.workspace = true

[dev-dependencies]
assert_matches = "1.5.0"
Expand Down
27 changes: 27 additions & 0 deletions ee/tabby-webserver/src/service/background_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,44 @@ mod github;
mod gitlab;
mod helper;
mod scheduler;
mod third_party_integration;

use std::sync::Arc;

use apalis::{
prelude::{Monitor, Storage},
sqlite::{SqlitePool, SqliteStorage},
};
use juniper::ID;
use tabby_common::config::{RepositoryAccess, RepositoryConfig};
use tabby_db::DbConn;
use tabby_schema::{integration::IntegrationService, repository::ThirdPartyRepositoryService};

use self::{
db::DbMaintainanceJob, github::SyncGithubJob, gitlab::SyncGitlabJob, scheduler::SchedulerJob,
third_party_integration::ThirdPartyRepositorySyncJob,
};
use crate::path::job_db_file;

pub enum BackgroundJobEvent {
Scheduler(RepositoryConfig),
SyncGithub(i64),
SyncGitlab(i64),
SyncThirdPartyRepositories(ID),
}

struct BackgroundJobImpl {
scheduler: SqliteStorage<SchedulerJob>,
gitlab: SqliteStorage<SyncGitlabJob>,
github: SqliteStorage<SyncGithubJob>,
third_party_repository: SqliteStorage<ThirdPartyRepositorySyncJob>,
}

pub async fn start(
db: DbConn,
repository_access: Arc<dyn RepositoryAccess>,
third_party_repository_service: Arc<dyn ThirdPartyRepositoryService>,
integration_service: Arc<dyn IntegrationService>,
mut receiver: tokio::sync::mpsc::UnboundedReceiver<BackgroundJobEvent>,
) {
let path = format!("sqlite://{}?mode=rwc", job_db_file().display());
Expand All @@ -49,6 +57,13 @@ pub async fn start(
SchedulerJob::register(monitor, pool.clone(), db.clone(), repository_access);
let (gitlab, monitor) = SyncGitlabJob::register(monitor, pool.clone(), db.clone());
let (github, monitor) = SyncGithubJob::register(monitor, pool.clone(), db.clone());
let (third_party_repository, monitor) = ThirdPartyRepositorySyncJob::register(
monitor,
pool.clone(),
db.clone(),
third_party_repository_service,
integration_service,
);

tokio::spawn(async move {
monitor.run().await.expect("failed to start worker");
Expand All @@ -59,6 +74,7 @@ pub async fn start(
scheduler,
gitlab,
github,
third_party_repository,
};

while let Some(event) = receiver.recv().await {
Expand All @@ -76,6 +92,14 @@ impl BackgroundJobImpl {
.expect("unable to push job");
}

async fn trigger_sync_integration(&self, provider_id: ID) {
self.third_party_repository
.clone()
.push(ThirdPartyRepositorySyncJob::new(provider_id))
.await
.expect("Unable to push job");
}

async fn trigger_sync_github(&self, provider_id: i64) {
self.github
.clone()
Expand All @@ -101,6 +125,9 @@ impl BackgroundJobImpl {
BackgroundJobEvent::SyncGitlab(provider_id) => {
self.trigger_sync_gitlab(provider_id).await
}
BackgroundJobEvent::SyncThirdPartyRepositories(integration_id) => {
self.trigger_sync_integration(integration_id).await
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::sync::Arc;

use apalis::{
prelude::{Data, Job, Monitor, Storage, WorkerFactoryFn},
sqlite::{SqlitePool, SqliteStorage},
utils::TokioExecutor,
};
use chrono::{DateTime, Utc};
use juniper::ID;
use serde::{Deserialize, Serialize};
use strum::IntoEnumIterator;
use tabby_db::DbConn;
use tabby_schema::{
integration::{IntegrationKind, IntegrationService},
repository::ThirdPartyRepositoryService,
};
use tracing::debug;

use super::{helper::BasicJob, helper::CronJob, scheduler::SchedulerJob};

#[derive(Serialize, Deserialize, Clone)]
pub struct ThirdPartyRepositorySyncJob {
integration_id: ID,
}

impl Job for ThirdPartyRepositorySyncJob {
const NAME: &'static str = "third_party_repository_sync";
}

impl CronJob for ThirdPartyRepositorySyncJob {
const SCHEDULE: &'static str = "@hourly";
}

impl ThirdPartyRepositorySyncJob {
pub fn new(integration_id: ID) -> Self {
Self { integration_id }
}

async fn run(
self,
repository_service: Data<Arc<dyn ThirdPartyRepositoryService>>,
) -> tabby_schema::Result<()> {
repository_service
.sync_repositories(self.integration_id)
.await?;
Ok(())
}

async fn cron(
_now: DateTime<Utc>,
storage: Data<SqliteStorage<ThirdPartyRepositorySyncJob>>,
integration_service: Data<Arc<dyn IntegrationService>>,
) -> tabby_schema::Result<()> {
debug!("Syncing all third-party repositories");

let mut storage = (*storage).clone();
for integration in integration_service
.list_integrations(None, None, None, None, None, None)
.await?
{
storage
.push(ThirdPartyRepositorySyncJob::new(integration.id))
.await
.expect("Unable to push job");
}
Ok(())
}

pub fn register(
monitor: Monitor<TokioExecutor>,
pool: SqlitePool,
db: DbConn,
repository_service: Arc<dyn ThirdPartyRepositoryService>,
integration_service: Arc<dyn IntegrationService>,
) -> (
SqliteStorage<ThirdPartyRepositorySyncJob>,
Monitor<TokioExecutor>,
) {
let storage = SqliteStorage::new(pool);
let monitor = monitor
.register(
Self::basic_worker(storage.clone(), db.clone())
.data(repository_service.clone())
.build_fn(Self::run),
)
.register(
Self::cron_worker(db)
.data(integration_service)
.data(storage.clone())
.build_fn(ThirdPartyRepositorySyncJob::cron),
);
(storage, monitor)
}
}
17 changes: 14 additions & 3 deletions ee/tabby-webserver/src/service/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@ use tabby_schema::{
integration::{IntegrationAccessToken, IntegrationKind, IntegrationService},
AsID, AsRowid, DbEnum, Result,
};
use tokio::sync::mpsc::UnboundedSender;

use crate::service::background_job::BackgroundJobEvent;

use super::graphql_pagination_to_filter;

struct IntegrationServiceImpl {
db: DbConn,
background_job: UnboundedSender<BackgroundJobEvent>,
}

pub fn create(db: DbConn) -> impl IntegrationService {
IntegrationServiceImpl { db }
pub fn create(
db: DbConn,
background_job: UnboundedSender<BackgroundJobEvent>,
) -> impl IntegrationService {
IntegrationServiceImpl { db, background_job }
}

#[async_trait]
Expand All @@ -32,7 +39,11 @@ impl IntegrationService for IntegrationServiceImpl {
access_token,
)
.await?;
Ok(id.as_id())
let id = id.as_id();
let _ = self
.background_job
.send(BackgroundJobEvent::SyncThirdPartyRepositories(id.clone()));
Ok(id)
}

async fn delete_integration(&self, id: ID) -> Result<()> {
Expand Down
18 changes: 6 additions & 12 deletions ee/tabby-webserver/src/service/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn create(
git: Arc::new(git::create(db.clone(), background.clone())),
github: Arc::new(github::create(db.clone(), background.clone())),
gitlab: Arc::new(gitlab::create(db.clone(), background.clone())),
third_party: Arc::new(third_party::create(db, integration)),
third_party: Arc::new(third_party::create(db, integration, background.clone())),
})
}

Expand All @@ -54,16 +54,7 @@ impl RepositoryAccess for RepositoryServiceImpl {
.collect();

repos.extend(
self.github
.list_active_git_urls()
.await
.unwrap_or_default()
.into_iter()
.map(RepositoryConfig::new),
);

repos.extend(
self.gitlab
self.third_party
.list_active_git_urls()
.await
.unwrap_or_default()
Expand Down Expand Up @@ -149,6 +140,8 @@ impl RepositoryService for RepositoryServiceImpl {
#[cfg(test)]
mod tests {
use tabby_db::DbConn;
use tokio::sync::mpsc::WeakUnboundedSender;
use tracing::instrument::WithSubscriber;

use super::*;

Expand All @@ -160,7 +153,8 @@ mod tests {
#[tokio::test]
async fn test_list_repositories() {
let db = DbConn::new_in_memory().await.unwrap();
let integration = Arc::new(crate::service::integration::create(db.clone()));
let (background, _) = tokio::sync::mpsc::unbounded_channel();
let integration = Arc::new(crate::service::integration::create(db.clone(), background));
let service = create(db.clone(), integration, create_fake());
service
.git()
Expand Down

0 comments on commit 0ec5736

Please sign in to comment.