Skip to content

Commit

Permalink
feat: Sync Metadata table (#17842)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbernays committed May 14, 2024
1 parent 97e2c67 commit 377bde2
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 53 deletions.
4 changes: 4 additions & 0 deletions cli/cmd/migrate_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func migrateConnectionV3(ctx context.Context, sourceClient *managedplugin.Client
return handleSendError(err, writeClients[i], "migrate")
}
}

if err := migrateSummaryTable(writeClients[i], destinationTransformers[i], destinationSpecs[i]); err != nil {
return fmt.Errorf("failed to migrate sync summary table: %w", err)
}
if _, err := writeClients[i].CloseAndRecv(); err != nil {
return err
}
Expand Down
154 changes: 130 additions & 24 deletions cli/cmd/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,36 @@ import (
"fmt"
"os"
"path/filepath"
"time"

"github.com/cloudquery/cloudquery/cli/internal/specs/v0"
"github.com/cloudquery/cloudquery/cli/internal/transformer"
"github.com/cloudquery/plugin-pb-go/managedplugin"
"github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
"github.com/cloudquery/plugin-sdk/v4/caser"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/cloudquery/plugin-sdk/v4/transformers"
"github.com/thoas/go-funk"
)

type syncSummary struct {
CliVersion string `json:"cli_version"`
DestinationErrors uint64 `json:"destination_errors"`
DestinationName string `json:"destination_name"`
DestinationPath string `json:"destination_path"`
DestinationVersion string `json:"destination_version"`
DestinationWarnings uint64 `json:"destination_warnings"`
Resources uint64 `json:"resources"`
SourceErrors uint64 `json:"source_errors"`
SourceName string `json:"source_name"`
SourcePath string `json:"source_path"`
SourceVersion string `json:"source_version"`
SourceWarnings uint64 `json:"source_warnings"`
SyncID string `json:"sync_id"`
CLIVersion string `json:"cli_version"`
DestinationErrors uint64 `json:"destination_errors"`
DestinationName string `json:"destination_name"`
DestinationPath string `json:"destination_path"`
DestinationVersion string `json:"destination_version"`
DestinationWarnings uint64 `json:"destination_warnings"`
Resources uint64 `json:"resources"`
SourceErrors uint64 `json:"source_errors"`
SourceName string `json:"source_name"`
SourcePath string `json:"source_path"`
SourceVersion string `json:"source_version"`
SourceWarnings uint64 `json:"source_warnings"`
SyncID string `json:"sync_id"`
SyncTime time.Time `json:"sync_time"`
}

func persistSummary(filename string, summaries []syncSummary) error {
func persistSummary(filename string, summary syncSummary) error {
// if filename is not specified then we don't need to persist the summary and can return
if filename == "" {
return nil
Expand All @@ -32,16 +43,14 @@ func persistSummary(filename string, summaries []syncSummary) error {
if err != nil {
return fmt.Errorf("failed to validate summary file path: %w", err)
}
for _, summary := range summaries {
dataBytes, err := json.Marshal(summary)
if err != nil {
return err
}
dataBytes = append(dataBytes, []byte("\n")...)
err = appendToFile(filename, dataBytes)
if err != nil {
return fmt.Errorf("failed to append summary to file: %w", err)
}
dataBytes, err := json.Marshal(summary)
if err != nil {
return err
}
dataBytes = append(dataBytes, []byte("\n")...)
err = appendToFile(filename, dataBytes)
if err != nil {
return fmt.Errorf("failed to append summary to file: %w", err)
}
return nil
}
Expand All @@ -62,3 +71,100 @@ func checkFilePath(filename string) error {
dirPath := filepath.Dir(filename)
return os.MkdirAll(dirPath, 0755)
}

func generateSummaryTable() (*schema.Table, error) {
tableName := "cloudquery_sync_summaries"
t := schema.Tables{{
Name: tableName,
Transform: transformers.TransformWithStruct(
&syncSummary{},
transformers.WithSkipFields("SyncTime"),
),
}}
if err := transformers.TransformTables(t); err != nil {
return nil, err
}
return t[0], nil
}

func migrateSummaryTable(writeClient plugin.Plugin_WriteClient, destTransformer *transformer.RecordTransformer, spec specs.Destination) error {
if !spec.SyncSummary {
return nil
}

summaryTable, err := generateSummaryTable()
if err != nil {
return err
}
summaryTableSchema := summaryTable.ToArrowSchema()
transformedSchema := destTransformer.TransformSchema(summaryTableSchema)
transformedSchemaBytes, err := plugin.SchemaToBytes(transformedSchema)
if err != nil {
return err
}
wr := &plugin.Write_Request{}
wr.Message = &plugin.Write_Request_MigrateTable{
MigrateTable: &plugin.Write_MessageMigrateTable{
MigrateForce: spec.MigrateMode == specs.MigrateModeForced,
Table: transformedSchemaBytes,
},
}
if err := writeClient.Send(wr); err != nil {
return handleSendError(err, writeClient, "migrate sync summary table")
}
return nil
}

func sendSummary(writeClient plugin.Plugin_WriteClient, destinationSpec specs.Destination, destinationsClient *managedplugin.Client, destinationTransformer *transformer.RecordTransformer, summary *syncSummary, noMigrate bool) error {
summaryTable, err := generateSummaryTable()
if err != nil {
return err
}

csr := caser.New(caser.WithCustomInitialisms(map[string]bool{"CLI": true}), caser.WithCustomExceptions(map[string]string{"cli": "CLI"}))

// Respect the noMigrate flag
if !noMigrate {
if err := migrateSummaryTable(writeClient, destinationTransformer, destinationSpec); err != nil {
return fmt.Errorf("failed to migrate sync summary table: %w", err)
}
}

// Get Information about the DestinationPlugin
m := destinationsClient.Metrics()
summary.DestinationErrors = m.Errors
summary.DestinationWarnings = m.Warnings

summary.DestinationName = destinationSpec.Name
summary.DestinationVersion = destinationSpec.Version
summary.DestinationPath = destinationSpec.Path

resource := schema.NewResourceData(summaryTable, nil, nil)
for _, col := range summaryTable.Columns {
err := resource.Set(col.Name, funk.Get(summary, csr.ToPascal(col.Name), funk.WithAllowZero()))
if err != nil {
return fmt.Errorf("failed to set %s: %w", col.Name, err)
}
}

vector := resource.GetValues()
arrowRecord := vector.ToArrowRecord(resource.Table.ToArrowSchema())

transformedRecord := destinationTransformer.Transform(arrowRecord)
transformedRecordBytes, err := plugin.RecordToBytes(transformedRecord)
if err != nil {
return fmt.Errorf("failed to transform sync summary bytes: %w", err)
}

wr := &plugin.Write_Request{}
wr.Message = &plugin.Write_Request_Insert{
Insert: &plugin.Write_MessageInsert{
Record: transformedRecordBytes,
},
}
if err := writeClient.Send(wr); err != nil {
return handleSendError(err, writeClient, "insert sync summary")
}

return nil
}
19 changes: 13 additions & 6 deletions cli/cmd/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSync(t *testing.T) {
config: "multiple-sources.yml",
summary: []syncSummary{
{
CliVersion: "development",
CLIVersion: "development",
DestinationErrors: 0,
DestinationName: "test",
DestinationPath: "cloudquery/test",
Expand All @@ -41,7 +41,7 @@ func TestSync(t *testing.T) {
SourcePath: "cloudquery/test",
},
{
CliVersion: "development",
CLIVersion: "development",
DestinationErrors: 0,
DestinationName: "test",
DestinationPath: "cloudquery/test",
Expand All @@ -60,15 +60,15 @@ func TestSync(t *testing.T) {
config: "multiple-sources-destinations.yml",
summary: []syncSummary{
{
CliVersion: "development",
CLIVersion: "development",
DestinationName: "test-1",
DestinationPath: "cloudquery/test",
Resources: 12,
SourceName: "test-1",
SourcePath: "cloudquery/test",
},
{
CliVersion: "development",
CLIVersion: "development",
DestinationName: "test-2",
DestinationPath: "cloudquery/test",
Resources: 12,
Expand All @@ -82,7 +82,7 @@ func TestSync(t *testing.T) {
config: "different-backend-from-destination.yml",
summary: []syncSummary{
{
CliVersion: "development",
CLIVersion: "development",
DestinationName: "test1",
DestinationPath: "cloudquery/test",
Resources: 12,
Expand Down Expand Up @@ -125,7 +125,14 @@ func TestSync(t *testing.T) {
if len(tc.summary) > 0 {
summaries := readSummaries(t, summaryPath)
// have to ignore SyncID because it's random and plugin versions since we update those frequently using an automated process
diff := cmp.Diff(tc.summary, summaries, cmpopts.IgnoreFields(syncSummary{}, "SyncID", "DestinationVersion", "SourceVersion"))
// also ignore SyncTime because it's a timestamp
diff := cmp.Diff(tc.summary, summaries, cmpopts.IgnoreFields(syncSummary{}, "SyncID", "DestinationVersion", "SourceVersion", "SyncTime"))
for _, s := range summaries {
assert.NotEmpty(t, s.SyncID)
assert.NotEmpty(t, s.SyncTime)
assert.NotEmpty(t, s.DestinationVersion)
assert.NotEmpty(t, s.SourceVersion)
}
require.Empty(t, diff, "unexpected summaries: %v", diff)
}

Expand Down
57 changes: 34 additions & 23 deletions cli/cmd/sync_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,52 +345,63 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
return fmt.Errorf("unknown message type: %T", m)
}
}

err = syncClient.CloseSend()
if err != nil {
return err
}
totals := sourceClient.Metrics()

for i := range destinationsClients {
if destinationSpecs[i].WriteMode == specs.WriteModeOverwriteDeleteStale {
if err := deleteStale(writeClients[i], tablesForDeleteStale, sourceName, syncTime); err != nil {
return err
}
}
if _, err := writeClients[i].CloseAndRecv(); err != nil {
return err
}
if _, err := destinationsPbClients[i].Close(ctx, &plugin.Close_Request{}); err != nil {
return err
}
}

sourceWarnings := totals.Warnings
sourceErrors := totals.Errors
syncSummaries := make([]syncSummary, len(destinationsClients))
var metadataDataErrors error
for i := range destinationsClients {
m := destinationsClients[i].Metrics()
totals.Warnings += m.Warnings
totals.Errors += m.Errors
syncSummaries[i] = syncSummary{
summary := syncSummary{
Resources: uint64(totalResources),
SourceErrors: sourceErrors,
SourceWarnings: sourceWarnings,
SyncID: uid,
SyncTime: syncTime,
SourceName: sourceSpec.Name,
SourceVersion: sourceSpec.Version,
SourcePath: sourceSpec.Path,
CliVersion: Version,
CLIVersion: Version,
DestinationErrors: m.Errors,
DestinationWarnings: m.Warnings,
DestinationName: destinationSpecs[i].Name,
DestinationVersion: destinationSpecs[i].Version,
DestinationPath: destinationSpecs[i].Path,
}

if err := persistSummary(summaryLocation, summary); err != nil {
log.Warn().Err(err).Msg("Failed to persist sync summary")
}

log.Info().Interface("summary", summary).Msg("Sync summary")
if !destinationSpecs[i].SyncSummary {
continue
}
// Only send the summary to the destination that matches the current destination
if err := sendSummary(writeClients[i], destinationSpecs[i], destinationsClients[i], destinationTransformers[i], &summary, noMigrate); err != nil {
metadataDataErrors = errors.Join(metadataDataErrors, err)
}
}
err = persistSummary(summaryLocation, syncSummaries)
if err != nil {
log.Warn().Err(err).Msg("Failed to persist sync summary")
if metadataDataErrors != nil {
return metadataDataErrors
}

for i := range destinationsClients {
if destinationSpecs[i].WriteMode == specs.WriteModeOverwriteDeleteStale {
if err := deleteStale(writeClients[i], tablesForDeleteStale, sourceName, syncTime); err != nil {
return err
}
}
if _, err := writeClients[i].CloseAndRecv(); err != nil {
return err
}
if _, err := destinationsPbClients[i].Close(ctx, &plugin.Close_Request{}); err != nil {
return err
}
}

err = bar.Finish()
Expand Down
2 changes: 2 additions & 0 deletions cli/internal/specs/v0/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Destination struct {

SyncGroupId string `json:"sync_group_id,omitempty"`

SyncSummary bool `json:"send_sync_summary,omitempty"`

// Destination plugin own (nested) spec
Spec map[string]any `json:"spec,omitempty"`
}
Expand Down
3 changes: 3 additions & 0 deletions cli/internal/specs/v0/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions website/pages/docs/reference/destination-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@ This is useful when splitting a sync into [multiple parallel jobs](https://docs.
The value supports the following placeholders: `{{YEAR}}, {{MONTH}}, {{DAY}}, {{HOUR}}, {{MINUTE}}` which are based on the sync time.
A common use case is to use set `sync_group_id: "{{YEAR}}-{{MONTH}}-{{DAY}}"` to group syncs by day, in order to provide an historical view of the data, partitioned by day.



<!-- vale off -->

### send_sync_summary (preview)

<!-- vale on -->

(`bool`, optional)

When set to `true`, CloudQuery will send a summary of the sync to the destination plugin. The summary includes the number of resources synced, number of errors and details about the plugins (both source and destination). This information will be available in the destination as a separate table named `cloudquery_sync_summaries`.


### spec

(`object`, optional)
Expand Down

0 comments on commit 377bde2

Please sign in to comment.