Skip to content

Commit

Permalink
add ProductAvailabilitySnapshot purging job (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
PhillippOhlandt committed May 7, 2023
1 parent 34c0e90 commit f9744f7
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 3 deletions.
5 changes: 3 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ config :phoenix, :json_library, Jason

config :nerves_metal_detector, Oban,
repo: NervesMetalDetector.Repo,
queues: [default: 1, product_updates: 15],
queues: [default: 1, product_updates: 15, product_availability_snapshots_purge: 1],
plugins: [
Oban.Plugins.Pruner,
{Oban.Plugins.Cron,
crontab: [
{"0 * * * *", NervesMetalDetector.Jobs.ScheduleProductUpdates}
{"0 * * * *", NervesMetalDetector.Jobs.ScheduleProductUpdates},
{"10 0 * * *", NervesMetalDetector.Jobs.ScheduleProductAvailabilitySnapshotsPurges}
]}
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule NervesMetalDetector.Inventory.ProductAvailabilitySnapshot do
field :in_stock, :boolean
field :items_in_stock, :integer
field :price, Money.Ecto.Composite.Type
field :fetched_at, :utc_datetime
field :fetched_at, :utc_datetime, primary_key: true
end

@doc false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule NervesMetalDetector.Jobs.ProductAvailabilitySnapshotsPurge do
use Oban.Worker,
queue: :product_availability_snapshots_purge,
max_attempts: 3

import Ecto.Query, only: [dynamic: 2]

alias NervesMetalDetector.Inventory
alias NervesMetalDetector.Inventory.ProductAvailabilitySnapshot
alias NervesMetalDetector.TimeSeries
alias NervesMetalDetector.Repo

@impl Oban.Worker
def perform(%Oban.Job{args: %{"vendor" => vendor, "sku" => sku}}) do
snapshots = list_snapshots(vendor, sku)

{_, to_delete} =
TimeSeries.consecutive_dedup(snapshots, fn %ProductAvailabilitySnapshot{} = snapshot,
_index,
_total_count ->
{DateTime.to_date(snapshot.fetched_at), snapshot.price, snapshot.items_in_stock,
snapshot.in_stock}
end)

for snapshot <- to_delete do
Process.sleep(50)
Repo.delete(snapshot)
end

:ok
end

defp list_snapshots(vendor, sku) do
two_weeks_ago =
DateTime.now!("Etc/UTC")
|> DateTime.to_date()
|> Date.add(-13)
|> DateTime.new!(Time.new!(0, 0, 0), "Etc/UTC")

Inventory.list_product_availability_snapshots(
dynamic(
[s],
s.vendor == ^vendor and s.sku == ^sku and s.fetched_at < ^two_weeks_ago
)
)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule NervesMetalDetector.Jobs.ScheduleProductAvailabilitySnapshotsPurges do
use Oban.Worker,
queue: :product_availability_snapshots_purge,
max_attempts: 1

alias NervesMetalDetector.Vendors
alias NervesMetalDetector.Inventory.Data.ProductUpdateItems
alias NervesMetalDetector.Jobs.ProductAvailabilitySnapshotsPurge

@impl Oban.Worker
def perform(_job) do
vendors = Vendors.all()

for vendor <- vendors do
{:ok, product_update_items} = ProductUpdateItems.get_by_vendor(vendor)

for item <- product_update_items do
ProductAvailabilitySnapshotsPurge.new(%{vendor: vendor.id, sku: item.sku})
|> Oban.insert()
end
end

:ok
end
end

0 comments on commit f9744f7

Please sign in to comment.