Skip to content

bqloader is a simple ETL framework to load data from Cloud Storage into BigQuery.

License

Notifications You must be signed in to change notification settings

nownabe/go-bqloader

Repository files navigation

bqloader

PkgGoDev Main Branch Workflow Go Report Card codecov GitHub GitHub tag (latest SemVer)

bqloader is a simple ETL framework to load data from Cloud Storage into BigQuery.

Installation

go get -u go.nownabe.dev/bqloader

Getting Started with Pre-configured Handlers

See the example to get a full instruction.

To load some types of CSV formats, you can use pre-configured handlers. See full list.

package myfunc

import (
	"context"

	"go.nownabe.dev/bqloader"
	"go.nownabe.dev/bqloader/contrib/handlers"
)

var loader bqloader.BQLoader

func init() {
	loader, _ = bqloader.New()

	t := handlers.TableGenerator(os.Getenv("BIGQUERY_PROJECT_ID"), os.Getenv("BIGQUERY_DATASET_ID"))
	n := &bqloader.SlackNotifier{
		Token:   os.Getenv("SLACK_TOKEN"),
		Channel: os.Getenv("SLACK_CHANNEL"),
	}

	handlers.MustAddHandlers(context.Background(), loader,
		/*
			These build handlers to load CSVs, given four arguments:
			handler name, a pattern to file path on Cloud Storage, a BigQuery table and a notifier.
		*/
		handlers.SBISumishinNetBankStatement("SBI Bank", `^sbi_bank/`, t("sbi_bank"), n),
		handlers.SMBCCardStatement("SMBC Card", `^smbc_card/`, t("smbc_card"), n),
	)
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Getting Started with Custom Handlers

(See Quickstart example to get a full instruction.)

To load other CSVs, import the package go.nownabe.dev/bqloader and write your custom handler.

package myfunc

import (
	"context"
	"os"
	"regexp"
	"strings"
	"time"

	"golang.org/x/text/encoding/japanese"
	"golang.org/x/xerrors"

	"go.nownabe.dev/bqloader"
)

var loader bqloader.BQLoader

func init() {
	loader, _ = bqloader.New()
	loader.MustAddHandler(context.Background(), newHandler())
}

func newHandler() *bqloader.Handler {
	/*
		This projector converts date fields formatted as "2006/01/02"
		at the first column into strings like "2006-01-02" that satisfies
		BigQuery date type.
	*/
	projector := func(_ context.Context, r []string) ([]string, error) {
		t, err := time.Parse("2006/01/02", r[0])
		if err != nil {
			return nil, xerrors.Errorf("Column 0 cannot parse as a date: %w", err)
		}

		r[0] = t.Format("2006-01-02")

		return r, nil
	}

	return &bqloader.Handler{
		Name:     "quickstart",                         // Handler name used in logs and notifications.
		Pattern:  regexp.MustCompile("^example_bank/"), // This handler processes files matched to this pattern.
		Encoding: japanese.ShiftJIS,                    // Source file encoding.
		Parser:   bqloader.CSVParser(),                 // Parser parses source file into records.
		Notifier: &bqloader.SlackNotifier{
			Token:   os.Getenv("SLACK_TOKEN"),
			Channel: os.Getenv("SLACK_CHANNEL"),
		},
		Projector:       projector, // Projector transforms each row.
		SkipLeadingRows: 1,         // Skip header row.

		// Destination.
		Project: os.Getenv("BIGQUERY_PROJECT_ID"),
		Dataset: os.Getenv("BIGQUERY_DATASET_ID"),
		Table:   os.Getenv("BIGQUERY_TABLE_ID"),
	}
}

// BQLoad is the entrypoint for Cloud Functions.
func BQLoad(ctx context.Context, e bqloader.Event) error {
	return loader.Handle(ctx, e)
}

Diagram

diagram

About

bqloader is a simple ETL framework to load data from Cloud Storage into BigQuery.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages