Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-20993] Periodic refresh implementation of source control metadata for SCM sync status #15626

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

adrikagupta
Copy link
Contributor

@adrikagupta adrikagupta commented Apr 29, 2024

  • Added Periodic refresh for namespace and repository source control metadata
    • Source control metadata is automatically refreshed every 3 hours which is configurable
    • There is a way to manually refresh the source control metadata by calling the list APIs
    • If the time elapsed since the last refresh is within the buffer time of 10 minutes (this is configurable), then the manual trigger is initiated
      • After setting a repo config, manual trigger is initiated. The above check is skipped.
    • If the service for that namespace is already running, refresh operation is skipped.

@adrikagupta adrikagupta force-pushed the refresh-sync branch 2 times, most recently from eb92835 to d8b506f Compare May 2, 2024 22:36
@GnsP GnsP added build Triggers github actions build 6.10.1 labels May 3, 2024
cdap-common/src/main/resources/cdap-default.xml Outdated Show resolved Hide resolved
* It is used in filtering.
*/
public enum SyncStatus {
SYNCED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = !limitReachedAndLastRefreshPair.getKey() || record == null ? null :
record.getName();
Long lastRefreshTime = limitReachedAndLastRefreshPair.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just expose a function in applicationLifecycleService to get the last refresh time


public void removeRefreshService(NamespaceId namespaceId) {
if(refreshSchedulers.containsKey(namespaceId)) {
refreshSchedulers.get(namespaceId).stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if there is some Optional return

AppMetadataStore appMetadataStore = getAppMetadataStore(context);
for (Map.Entry<ApplicationId, SourceControlMeta> updateRequest : updateRequests.entrySet()) {
ApplicationId appId = updateRequest.getKey();
if (appMetadataStore.getApplication(appId) != null) {
sourceControlMetadataStore.write(appId.getAppReference(),
SourceControlMeta.builder(updateRequest.getValue()).setSyncStatus(true).build());
if (repoStore.get(appId.getAppReference()) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should always update repostore

TransactionRunners.run(transactionRunner, context -> {
getNamespaceSourceControlMetadataStore(context).write(appRef,
SourceControlMeta.builder(sourceControlMeta).setSyncStatus(true).build());
RepositorySourceControlMetadataStore repoStore = getRepoSourceControlMetadataStore(context);
if (repoStore.get(appRef) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@adrikagupta adrikagupta added build Triggers github actions build and removed 6.10.1 build Triggers github actions build labels May 4, 2024
getNamespaceSourceControlMetadataStore(context).write(id.getAppReference(),
meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta()
: SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true).build());
if (repoStore.get(appRef) != null) {
repoStore.write(appRef, meta.getSourceControlMeta() != null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please add temporary variables to hold isSync, LastModified value. This way it would be easy to understand what is being passed to write()

+ namespaceId.getNamespace());
refreshSchedulers.get(namespaceId).triggerManualRefresh(false);
} else {
addRefreshService(namespaceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should always happen before the if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But addRefreshService should be called only if it is not already present in refreshSchedulers, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addRefreshService should have putIfAbsent that would take care of existing scheduler issue

* @param namespaceId The ID of the namespace for which to add the refresh service.
*/
public void addRefreshService(NamespaceId namespaceId) {
LOG.info("Adding refresh service for " + namespaceId.getNamespace());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug log. and add it after you added the scheduler

*/
public void addRefreshService(NamespaceId namespaceId) {
LOG.info("Adding refresh service for " + namespaceId.getNamespace());
if (!refreshSchedulers.containsKey(namespaceId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use putIfNotPresent. The current logic is not threadsafe

LOG.info("Shutting down SourceControlManagementService");

for (NamespaceSourceControlMetadataRefreshService service : refreshSchedulers.values()) {
service.stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopandwait()

LOG.info("Starting SourceControlMetadataRefreshManager");

List<NamespaceId> namespaceIds = namespaceAdmin.list().stream()
.map(meta -> meta.getNamespaceId()).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do

  if (getRepositoryMeta(namespaceId) != null) {
        addRefreshService(namespaceId);
      }

here only. no need for a list


// Triggering source control metadata refresh service
try {
checkSourceControlMetadataAutoRefreshFlag();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not do this. Otherwise in case if the flag is false the API would not work at all. Just skip refresh if the flag is not enabled


public SingleSourceControlMetadataResponse(SourceControlMetadataRecord record,
Long lastRefreshTime) {
this.record = record;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont name it record. rather name it app. Similar to how list api is calling this apps

@Override
public void runOneIteration() {

running.set(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please call runrefreshservice here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!running.compareAndSet(false, true)) {
      return;
    }
  runRefreshService(forced);

</property>
<property>
<name>source.control.metadata.refresh.buffer.seconds</name>
<!-- <value>600</value>-->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix. Set it to 10 min by default

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix checkstyle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix checkstyle

meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta()
: SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true)
.build());
if (repoStore.get(appRef) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not also write when meta.getSourceControlMeta() != null ?

getNamespaceSourceControlMetadataStore(context).write(id.getAppReference(),
meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta()
: SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true).build());
if (repoStore.get(appRef) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not also write when meta.getSourceControlMeta() != null ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have changed to -

if (meta.getSourceControlMeta() != null) {
          Instant lastSyncedAt = meta.getSourceControlMeta().getLastSyncedAt();
          repoStore.write(appRef, true, lastSyncedAt == null ? null :
              lastSyncedAt.toEpochMilli());
        }
        else{
          ImmutablePair<Long, Boolean> lastSyncedAndStatusPair = repoStore.get(appRef);
          if(lastSyncedAndStatusPair != null) {
            repoStore.write(appRef, false, lastSyncedAndStatusPair.getFirst());
          }
        }

Three cases should be considered:

  1. If we are deploying an app in namespace and hasn't been pulled or pushed before - In this case we check, scm meta is null then we dont write anything in repo table since it is not present in the repo table.
  2. If we are deploying an app already been pulled/pushed before - In this case the record will be present in repo table hence we will change the sync status but retain the last modified.
  3. If we are pulling and deploying - In this case since scm meta will not be null we will be setting the values in repo table.

I feel the previous implementation might have also worked based on the concept that if we are pulling and deploying or are deploying an app which has been pulled/pushed before, the record would be there in the repo table.

public void addRefreshService(NamespaceId namespaceId) {
NamespaceSourceControlMetadataRefreshService service =
refreshSchedulers.putIfAbsent(namespaceId, createRefreshService(namespaceId));
if (service == null || isSourceControlMetadataPeriodicRefreshFlagEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the service only if flag is enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it is possible that the user disables the periodic refresh flag but keeps manual refresh flag enabled

refreshSchedulers.get(namespaceId).start();
LOG.debug("Added Scheduled NamespaceSourceControlMetadataRefreshService for "
+ namespaceId.getNamespace());
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we triggering here again ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 public void addRefreshService(NamespaceId namespaceId) {
    NamespaceSourceControlMetadataRefreshService service =
        refreshSchedulers.putIfAbsent(namespaceId, createRefreshService(namespaceId));
    if (service == null || isSourceControlMetadataPeriodicRefreshFlagEnabled()) {
      refreshSchedulers.get(namespaceId).start();
    } else {
      refreshSchedulers.get(namespaceId).triggerManualRefresh(true);
    }
  }

If we are setting the repo config for the first time, it creates refresh service, calls start function, and performs runOneIteration.
When we are updating the repo config, it is already present in refreshSchedulers. If start() function is called, it will see that it has already started and will not call runOneIteration instantly. So we have to trigger it manually so that the changes are reflected in the git pipelines page.

protected void startUp() throws Exception {
// This gives users an option to disable the source control metadata refresh service
// for all namespaces
if (this.runInterval <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this as we already have a feature flag

Copy link
Contributor

@samdgupi samdgupi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge only after requested changes are done

* or null if metadata is not found.
* @throws IOException If an I/O error occurs.
*/
public ImmutablePair get(ApplicationReference appRef) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this changed needed ? why we cant use SourceControlMetadataRecord

lastSyncedAt.toEpochMilli());
}
else{
ImmutablePair<Long, Boolean> lastSyncedAndStatusPair = repoStore.get(appRef);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant we just use SourceControlMetadataRecord ?

else{
ImmutablePair<Long, Boolean> lastSyncedAndStatusPair = repoStore.get(appRef);
if(lastSyncedAndStatusPair != null) {
repoStore.write(appRef, false, lastSyncedAndStatusPair.getFirst());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned before please add a separate function to toggle sync status only

*/
public Long getLastRefreshTime(NamespaceId namespaceId) {
if (refreshSchedulers.containsKey(namespaceId)) {
return refreshSchedulers.get(namespaceId).getLastRefreshTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct as refreshSchedulers.get(namespaceId) can be null. Rather get refreshSchedulers.get(namespaceId) in a temp variable and check if it is null

protected void shutDown() throws Exception {
LOG.info("Shutting down SourceControlMetadataRefreshService");

for (NamespaceSourceControlMetadataRefreshService service : refreshSchedulers.values()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Use forEach

lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please throw exception and let the http exception handler to handle the response. If you send here, you need to return from the catch block, otherwise the flow will continue.

} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lastRecord variable seems unnecessary. The same can be achieved by apps.isEmpty() and apps.get(apps.size() - 1).

SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would execute another SQL in another transaction, which may be inconsistent with what has been returned by the scanSourceControlMetadata call. I think it's better for the scan method to return a response that contains apps, page limit, and last scan time. This makes sure all the data are coming from the same transaction.

lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Please throw. The old code was wrong too, please don't follow with that pattern.

SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
Long lastRefreshTime = sourceControlService.getLastRefreshTime(namespaceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be using primitive long instead.


// Triggering source control metadata refresh service
if (isSourceControlMetadataManualRefreshFlagEnabled()) {
sourceControlMetadataRefreshService.runRefreshService(namespaceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the refreshment is enabled only when there is a scan?


@Override
protected final ScheduledExecutorService executor() {
executor = Executors.newSingleThreadScheduledExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have one thread per namespace for the refresh? Seems not an efficient way of using thread. Ideally we only need a small scheduled thread pool for all namespaces refresh?

// TODO(CDAP-21017): Optimize periodic refresh of source control metadata
@Override
public void runOneIteration() {
if (!running.compareAndSet(false, true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need an atomic boolean for this. The AbstractScheduledService should have the correct guarantee of how the scheduled run is.


private void runRefreshService(boolean forced) {
running.set(true);
Long refreshStartTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use primitive long.


try {

if (!namespaceAdmin.exists(namespaceId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything that happens in one run should happens in the same transaction. Using admin will create a different transaction boundary.

new NamespaceRepository(namespaceId, repoConfig));

// Cleaning up the repo source control metadata table
HashSet<String> repoFileNames = repositoryAppsResponse.getApps().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use interface Set as the variable type.

lastRefreshTime.set(refreshStartTime);

} catch (Exception e) {
LOG.error("Failed to refresh source control metadata for namespace "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use placeholder {} for logging instead of string concat. Also, seems like a warning than error.

@Override
protected void startUp() throws Exception {
LOG.info("Starting SourceControlMetadataRefreshService for namespace "
+ namespaceId.getNamespace() + " with interval " + runInterval + " seconds");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use placeholder {} instead of string concat for loggin.

}

private void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash,
Long refreshStartTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use primitive long.

.setNamespace(namespace)
.setLimit(Integer.MAX_VALUE)
.build();
ArrayList<SourceControlMetadataRecord> records = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use List as the type.

throws IOException {
for (SourceControlMetadataRecord record : records) {
if (!repoFiles.contains(record.getName())) {
store.delete(new ApplicationReference(record.getNamespace(), record.getName()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the store.delete actually do? Asking it because this method is nested inside the transactional scan method.

return true;
}

public Long getLastRefreshTime(String namespace) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use primitive long.

Copy link
Contributor

@chtyim chtyim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address the top level comments first. I will take another pass after those are fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
6.10.1 build Triggers github actions build
Projects
None yet
4 participants