Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize imports for extremely large history sizes for #168 #171

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions client/cmd/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/ddworken/hishtory/shared"
"github.com/google/uuid"
"github.com/spf13/cobra"
"gorm.io/gorm"
)

var offlineInit *bool
Expand All @@ -41,7 +40,7 @@ var installCmd = &cobra.Command{
if os.Getenv("HISHTORY_SKIP_INIT_IMPORT") == "" {
db, err := hctx.OpenLocalSqliteDb()
lib.CheckFatalError(err)
count, err := countStoredEntries(db)
count, err := lib.CountStoredEntries(db)
lib.CheckFatalError(err)
if count < 10 {
fmt.Println("Importing existing shell history...")
Expand All @@ -65,7 +64,7 @@ var initCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
db, err := hctx.OpenLocalSqliteDb()
lib.CheckFatalError(err)
count, err := countStoredEntries(db)
count, err := lib.CountStoredEntries(db)
lib.CheckFatalError(err)
if count > 0 {
fmt.Printf("Your current hishtory profile has saved history entries, are you sure you want to run `init` and reset?\nNote: This won't clear any imported history entries from your existing shell\n[y/N]")
Expand Down Expand Up @@ -128,13 +127,6 @@ var uninstallCmd = &cobra.Command{
},
}

func countStoredEntries(db *gorm.DB) (int64, error) {
return lib.RetryingDbFunctionWithResult(func() (int64, error) {
var count int64
return count, db.Model(&data.HistoryEntry{}).Count(&count).Error
})
}

func warnIfUnsupportedBashVersion() error {
_, err := exec.LookPath("bash")
if err != nil {
Expand Down
79 changes: 60 additions & 19 deletions client/lib/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,32 +669,59 @@ func Reupload(ctx context.Context) error {
if config.IsOffline {
return nil
}
entries, err := Search(ctx, hctx.GetDb(ctx), "", 0)
numEntries, err := CountStoredEntries(hctx.GetDb(ctx))
if err != nil {
return fmt.Errorf("failed to reupload due to failed search: %w", err)
return fmt.Errorf("failed to upload history entries due to error in counting entries: %v", err)
}
var bar *progressbar.ProgressBar
if len(entries) > NUM_IMPORTED_ENTRIES_SLOW {
if numEntries > int64(NUM_IMPORTED_ENTRIES_SLOW) {
fmt.Println("Persisting history entries")
bar = progressbar.Default(int64(len(entries)))
bar = progressbar.Default(int64(numEntries))
defer bar.Finish()
}
chunkSize := 500
chunks := shared.Chunks(entries, chunkSize)
return shared.ForEach(chunks, 10, func(chunk []*data.HistoryEntry) error {
jsonValue, err := EncryptAndMarshal(config, chunk)

// This number is a balance between speed and memory usage. If we make it too high, then
// it will mean we use a ton of memory (since we retrieve all of those entries). But if
// we make it too low, then it will have to do repeated SQL queries with OFFSETs, which
// are inherently slow.
searchChunkSize := 300_000
currentOffset := 0
for {
entries, err := SearchWithOffset(ctx, hctx.GetDb(ctx), "", searchChunkSize, currentOffset)
if err != nil {
return fmt.Errorf("failed to reupload due to failed encryption: %w", err)
return fmt.Errorf("failed to reupload due to failed search: %w", err)
}
_, err = ApiPost(ctx, "/api/v1/submit?source_device_id="+config.DeviceId, "application/json", jsonValue)
if err != nil {
return fmt.Errorf("failed to reupload due to failed POST: %w", err)
if len(entries) == 0 {
if currentOffset == 0 && numEntries != 0 {
return fmt.Errorf("found no entries for reuploading, something went wrong")
} else {
return nil
}
}
if bar != nil {
_ = bar.Add(chunkSize)
currentOffset += searchChunkSize
// This number is a balance between speed, and ensuring that we don't send too much data
// in a single request (since large individual requests are extremely slow). From benchmarking,
// it is apparent that this value seems to work quite well.
uploadChunkSize := 500
chunks := shared.Chunks(entries, uploadChunkSize)
err = shared.ForEach(chunks, 10, func(chunk []*data.HistoryEntry) error {
jsonValue, err := EncryptAndMarshal(config, chunk)
if err != nil {
return fmt.Errorf("failed to reupload due to failed encryption: %w", err)
}
_, err = ApiPost(ctx, "/api/v1/submit?source_device_id="+config.DeviceId, "application/json", jsonValue)
if err != nil {
return fmt.Errorf("failed to reupload due to failed POST: %w", err)
}
if bar != nil {
_ = bar.Add(uploadChunkSize)
}
return nil
})
if err != nil {
return err
}
return nil
})
}
}

func RetrieveAdditionalEntriesFromRemote(ctx context.Context, queryReason string) error {
Expand Down Expand Up @@ -832,12 +859,16 @@ func MakeWhereQueryFromSearch(ctx context.Context, db *gorm.DB, query string) (*
}

func Search(ctx context.Context, db *gorm.DB, query string, limit int) ([]*data.HistoryEntry, error) {
return retryingSearch(ctx, db, query, limit, 0)
return SearchWithOffset(ctx, db, query, limit, 0)
}

func SearchWithOffset(ctx context.Context, db *gorm.DB, query string, limit, offset int) ([]*data.HistoryEntry, error) {
return retryingSearch(ctx, db, query, limit, offset, 0)
}

const SEARCH_RETRY_COUNT = 3

func retryingSearch(ctx context.Context, db *gorm.DB, query string, limit int, currentRetryNum int) ([]*data.HistoryEntry, error) {
func retryingSearch(ctx context.Context, db *gorm.DB, query string, limit, offset int, currentRetryNum int) ([]*data.HistoryEntry, error) {
if ctx == nil && query != "" {
return nil, fmt.Errorf("lib.Search called with a nil context and a non-empty query (this should never happen)")
}
Expand All @@ -855,13 +886,16 @@ func retryingSearch(ctx context.Context, db *gorm.DB, query string, limit int, c
if limit > 0 {
tx = tx.Limit(limit)
}
if offset > 0 {
tx = tx.Offset(offset)
}
var historyEntries []*data.HistoryEntry
result := tx.Find(&historyEntries)
if result.Error != nil {
if strings.Contains(result.Error.Error(), SQLITE_LOCKED_ERR_MSG) && currentRetryNum < SEARCH_RETRY_COUNT {
hctx.GetLogger().Infof("Ignoring err=%v and retrying search query, cnt=%d", result.Error, currentRetryNum)
time.Sleep(time.Duration(currentRetryNum*rand.Intn(50)) * time.Millisecond)
return retryingSearch(ctx, db, query, limit, currentRetryNum+1)
return retryingSearch(ctx, db, query, limit, offset, currentRetryNum+1)
}
return nil, fmt.Errorf("DB query error: %w", result.Error)
}
Expand Down Expand Up @@ -1069,3 +1103,10 @@ func SendDeletionRequest(ctx context.Context, deletionRequest shared.DeletionReq
}
return nil
}

func CountStoredEntries(db *gorm.DB) (int64, error) {
return RetryingDbFunctionWithResult(func() (int64, error) {
var count int64
return count, db.Model(&data.HistoryEntry{}).Count(&count).Error
})
}
1 change: 1 addition & 0 deletions shared/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func BackupAndRestoreWithId(t testing.TB, id string) func() {
path.Join(homedir, data.GetHishtoryPath(), "hishtory"),
path.Join(homedir, ".bash_history"),
path.Join(homedir, ".zsh_history"),
path.Join(homedir, ".zhistory"),
path.Join(homedir, ".local/share/fish/fish_history"),
}
for _, file := range renameFiles {
Expand Down
Loading