diff --git a/components/electric/lib/electric/plug/migrations.ex b/components/electric/lib/electric/plug/migrations.ex index f53f810d84..526115fb81 100644 --- a/components/electric/lib/electric/plug/migrations.ex +++ b/components/electric/lib/electric/plug/migrations.ex @@ -106,7 +106,7 @@ defmodule Electric.Plug.Migrations do schema_version = SchemaLoader.Version.new(version, schema) {:ok, msgs, _relations} = - Electric.Postgres.Replication.migrate(schema_version, stmt, dialect) + Electric.Postgres.Migration.to_op(stmt, schema_version, dialect) msgs end) diff --git a/components/electric/lib/electric/postgres/extension.ex b/components/electric/lib/electric/postgres/extension.ex index c1c58ca9cc..e063f25646 100644 --- a/components/electric/lib/electric/postgres/extension.ex +++ b/components/electric/lib/electric/postgres/extension.ex @@ -143,7 +143,7 @@ defmodule Electric.Postgres.Extension do defguard is_extension_relation(relation) when elem(relation, 0) == @schema defguard is_migration_relation(relation) - when relation in [{@schema, @version_relation}, {@schema, @ddl_relation}] + when relation in [{@schema, @electrified_tracking_relation}, {@schema, @ddl_relation}] defguard is_ddl_relation(relation) when relation == {@schema, @ddl_relation} @@ -156,6 +156,10 @@ defmodule Electric.Postgres.Extension do {:ok, query} end + def extract_ddl_txid(%{"txid" => txid, "txts" => txts}) do + {:ok, {txid, txts}} + end + def schema_version(conn, version) do with {:ok, [_, _], rows} <- :epgsql.equery(conn, @schema_version_query, [version]) do case rows do diff --git a/components/electric/lib/electric/postgres/migration.ex b/components/electric/lib/electric/postgres/migration.ex new file mode 100644 index 0000000000..a0e9abf294 --- /dev/null +++ b/components/electric/lib/electric/postgres/migration.ex @@ -0,0 +1,223 @@ +defmodule Electric.Postgres.Migration do + use Electric.Satellite.Protobuf + + alias PgQuery, as: Pg + + alias Electric.Postgres.{ + CachedWal, + Dialect, + Extension, + Extension.SchemaLoader, + Schema.AST, + Schema.Proto + } + + alias Electric.Replication.Changes + alias Electric.Replication.Connectors + + @default_dialect Dialect.SQLite + + @doc """ + Convert migration history entries to a list of migration transactions. + """ + @spec to_transactions([Extension.Migration.t()], Connectors.origin(), CachedWal.Api.wal_pos()) :: + [Changes.Transaction.t()] + def to_transactions(migrations, origin, lsn) do + publication = Extension.publication_name() + + Enum.map(migrations, fn %Extension.Migration{} = migration -> + schema_version = SchemaLoader.Version.new(migration.version, migration.schema) + {ops, relations} = to_ops(migration.stmts, schema_version) + + %Changes.Transaction{ + xid: migration.txid, + changes: [ + %Changes.Migration{ + version: migration.version, + schema: schema_version, + ddl: migration.stmts, + ops: ops, + relations: relations + } + ], + commit_timestamp: migration.timestamp, + origin: origin, + publication: publication, + lsn: lsn + } + end) + end + + @doc false + @spec to_ops([String.t()], SchemaLoader.Version.t()) :: + {Changes.Migration.Ops.t(), [Electric.Postgres.relation()]} + def to_ops(stmts, schema_version) do + stmts + |> Enum.reduce({%Changes.Migration.Ops{}, MapSet.new()}, fn stmt, {ops, relations} -> + Changes.Migration.dialects() + |> Enum.reduce(ops, fn {key, dialect}, ops -> + {:ok, new_ops, new_relations} = to_op(stmt, schema_version, dialect) + + {Map.update!(ops, key, &(&1 ++ new_ops)), Enum.into(new_relations, relations)} + end) + end) + |> then(fn {ops, relations} -> {ops, MapSet.to_list(relations)} end) + end + + # We get a list of sql statements and a schema: + # + # 1. generate the sqlite sql from the ast + # 2. get the list of tables involved in the migration + # 3. use the updated schema to get column, fk and pk information for the affected tables + # + # - creation of indexes doesn't affect any tables so that list should be empty + @spec to_op(String.t(), SchemaLoader.Version.t(), Electric.Postgres.Dialect.t()) :: + {:ok, [%SatOpMigrate{}], [Electric.Postgres.relation()]} + def to_op(stmt, schema_version, dialect \\ @default_dialect) do + ast = Electric.Postgres.parse!(stmt) + + case propagatable_stmt?(ast) do + [] -> + {:ok, [], []} + + propagate_ast -> + {msg, relations} = build_replication_msg(propagate_ast, schema_version, dialect) + + {:ok, [msg], relations} + end + end + + def stmt_type(%Pg.CreateStmt{}) do + :CREATE_TABLE + end + + def stmt_type(%Pg.IndexStmt{}) do + :CREATE_INDEX + end + + def stmt_type(%Pg.AlterTableStmt{cmds: [cmd]}) do + case cmd do + %{node: {:alter_table_cmd, %Pg.AlterTableCmd{subtype: :AT_AddColumn}}} -> + :ALTER_ADD_COLUMN + end + end + + def affected_tables(stmts, dialect \\ @default_dialect) when is_list(stmts) do + stmts + |> Enum.flat_map(&get_affected_table/1) + |> Enum.uniq_by(&Dialect.table_name(&1, dialect)) + end + + defp get_affected_table(%Pg.CreateStmt{relation: relation}) do + [AST.map(relation)] + end + + defp get_affected_table(%Pg.AlterTableStmt{relation: relation}) do + [AST.map(relation)] + end + + defp get_affected_table(%Pg.IndexStmt{}) do + [] + end + + defp get_affected_table(_stmt) do + [] + end + + defp build_replication_msg(ast, schema_version, dialect) do + affected_tables = affected_tables(ast, dialect) + + relations = Enum.map(affected_tables, &{&1.schema, &1.name}) + + tables = + affected_tables + |> Enum.map(&SchemaLoader.Version.table!(schema_version, &1)) + |> Enum.map(&replication_msg_table(&1, dialect)) + + table = + case tables do + [] -> nil + [table] -> table + end + + stmts = + Enum.map( + ast, + &%SatOpMigrate.Stmt{ + type: stmt_type(&1), + sql: Dialect.to_sql(&1, dialect) + } + ) + + {%SatOpMigrate{ + version: SchemaLoader.Version.version(schema_version), + table: table, + stmts: stmts + }, relations} + end + + defp propagatable_stmt?(ast) do + Enum.filter(ast, fn + %Pg.CreateStmt{} -> + true + + %Pg.IndexStmt{} -> + true + + %Pg.AlterTableStmt{ + cmds: [%{node: {:alter_table_cmd, %Pg.AlterTableCmd{subtype: :AT_AddColumn}}}] + } -> + true + + _else -> + false + end) + end + + defp replication_msg_table(%Proto.Table{} = table, dialect) do + %SatOpMigrate.Table{ + name: Dialect.table_name(table.name, dialect), + columns: Enum.map(table.columns, &replication_msg_table_col(&1, dialect)), + fks: Enum.flat_map(table.constraints, &replication_msg_table_fk(&1, dialect)), + pks: Enum.flat_map(table.constraints, &replication_msg_table_pk(&1, dialect)) + } + end + + defp replication_msg_table_col(%Proto.Column{} = column, dialect) do + %SatOpMigrate.Column{ + name: column.name, + pg_type: replication_msg_table_col_type(column.type), + sqlite_type: Dialect.type_name(column.type, dialect) + } + end + + defp replication_msg_table_col_type(%Proto.Column.Type{} = type) do + %SatOpMigrate.PgColumnType{ + name: type.name, + array: type.array, + size: type.size + } + end + + defp replication_msg_table_pk(%Proto.Constraint{constraint: {:primary, pk}}, _dialect) do + pk.keys + end + + defp replication_msg_table_pk(_constraint, _dialect) do + [] + end + + defp replication_msg_table_fk(%Proto.Constraint{constraint: {:foreign, fk}}, dialect) do + [ + %SatOpMigrate.ForeignKey{ + fk_cols: fk.fk_cols, + pk_cols: fk.pk_cols, + pk_table: Dialect.table_name(fk.pk_table, dialect) + } + ] + end + + defp replication_msg_table_fk(_constraint, _dialect) do + [] + end +end diff --git a/components/electric/lib/electric/postgres/migration/state.ex b/components/electric/lib/electric/postgres/migration/state.ex new file mode 100644 index 0000000000..17673bd597 --- /dev/null +++ b/components/electric/lib/electric/postgres/migration/state.ex @@ -0,0 +1,172 @@ +defmodule Electric.Postgres.Migration.State do + alias Electric.Replication.Changes + alias Electric.Replication.Changes.Migration + alias Electric.Replication.Changes.NewRecord + + alias Electric.Postgres.{ + Extension, + Extension.SchemaLoader, + Schema + } + + alias Electric.Telemetry.Metrics + + import Electric.Postgres.Extension, + only: [ + is_ddl_relation: 1, + is_migration_relation: 1 + ] + + require Logger + + @type apply_migration_opt() :: + {:schema_change_handler, (SchemaLoader.Version.t(), SchemaLoader.Version.t() -> none())} + @type apply_migration_opts() :: [apply_migration_opt()] + @type data_changes() :: [Changes.data_change()] + @type update_opt() :: apply_migration_opt() | {:skip_applied, boolean()} + @type update_opts() :: [update_opt()] + + # useful for testing + @doc false + @spec convert(data_changes(), SchemaLoader.t()) :: [Changes.change()] + def convert(changes, loader) do + {changes, _loader} = + chunk_convert(changes, loader, fn version, _stmts, loader -> + {:ok, schema_version} = SchemaLoader.load(loader, version) + {schema_version, loader} + end) + + changes + end + + @doc """ + Update Electric's migration state with a set of changes from the replication stream. + """ + @spec update(data_changes(), SchemaLoader.t(), update_opts()) :: + {[Changes.change()], SchemaLoader.t()} + def update(changes, loader, opts \\ []) do + chunk_convert(changes, loader, opts, fn version, stmts, loader -> + {_schema_version, _loader} = perform_migration(version, stmts, loader, opts) + end) + end + + defp chunk_convert(changes, loader, opts \\ [], version_callback) do + changes + |> chunk_migrations() + |> Enum.flat_map_reduce(loader, fn + [%{relation: relation} | _] = changes, loader when is_migration_relation(relation) -> + changes + |> transaction_changes_to_migrations(loader) + |> skip_applied_migrations(loader, opts[:skip_applied]) + |> Enum.map_reduce(loader, fn {version, stmts}, loader -> + {schema_version, loader} = version_callback.(version, stmts, loader) + + {migration(version, schema_version, stmts), loader} + end) + + changes, loader -> + {changes, loader} + end) + end + + defp chunk_migrations(changes) do + Enum.chunk_by(changes, fn + %NewRecord{relation: relation} -> is_migration_relation(relation) + _ -> false + end) + end + + defp migration(version, schema_version, stmts) do + {ops, relations} = Electric.Postgres.Migration.to_ops(stmts, schema_version) + + %Migration{ + version: version, + schema: schema_version, + ddl: stmts, + ops: ops, + relations: relations + } + end + + defp skip_applied_migrations(migrations, loader, true) do + {:ok, %{version: current_schema_version}} = SchemaLoader.load(loader) + + Enum.drop_while(migrations, fn {version, _stmts} -> version <= current_schema_version end) + end + + defp skip_applied_migrations(changes, _loader, _false) do + changes + end + + defp transaction_changes_to_migrations(changes, loader) do + changes + |> Enum.filter(fn + %NewRecord{relation: relation} -> is_ddl_relation(relation) + _ -> false + end) + |> Enum.map_reduce(%{}, fn %{record: record}, version_cache -> + {version, version_cache} = ddl_statement_version(record, version_cache, loader) + {:ok, sql} = Extension.extract_ddl_sql(record) + {{version, sql}, version_cache} + end) + |> elem(0) + |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) + end + + defp ddl_statement_version(record, version_cache, loader) do + {:ok, txid} = Extension.extract_ddl_txid(record) + + case Map.fetch(version_cache, txid) do + {:ok, version} -> + {version, version_cache} + + :error -> + {:ok, version} = SchemaLoader.tx_version(loader, record) + {version, Map.put(version_cache, txid, version)} + end + end + + defp perform_migration(version, stmts, loader, opts) do + {:ok, loader, schema_version} = apply_migration(version, stmts, loader, opts) + + Metrics.non_span_event( + [:postgres, :migration], + %{electrified_tables: Schema.num_electrified_tables(schema_version.schema)}, + %{migration_version: version} + ) + + {schema_version, loader} + end + + @doc """ + Apply a migration, composed of a version and a list of DDL statements, to a schema + using the given implementation of SchemaLoader. + """ + @spec apply_migration(String.t(), [String.t()], SchemaLoader.t(), apply_migration_opts()) :: + {:ok, SchemaLoader.t(), SchemaLoader.Version.t()} | {:error, term()} + def apply_migration(version, stmts, loader, opts \\ []) + when is_binary(version) and is_list(stmts) do + with {:ok, old_schema_version} <- SchemaLoader.load(loader) do + Logger.info("Migrating version #{old_schema_version.version || ""} -> #{version}") + + oid_loader = &SchemaLoader.relation_oid(loader, &1, &2, &3) + + schema = + stmts + |> Enum.reduce(old_schema_version.schema, fn stmt, schema -> + Logger.info("Applying migration #{version}: #{inspect(stmt)}") + Schema.update(schema, stmt, oid_loader: oid_loader) + end) + |> Schema.add_shadow_tables(oid_loader: oid_loader) + + Logger.info("Saving schema version #{version}") + + with {:ok, loader, new_schema_version} <- SchemaLoader.save(loader, version, schema, stmts) do + if change_handler = opts[:schema_change_handler], + do: change_handler.(old_schema_version, new_schema_version) + + {:ok, loader, new_schema_version} + end + end + end +end diff --git a/components/electric/lib/electric/postgres/replication.ex b/components/electric/lib/electric/postgres/replication.ex index 8b08891b7d..0e061bb4fd 100644 --- a/components/electric/lib/electric/postgres/replication.ex +++ b/components/electric/lib/electric/postgres/replication.ex @@ -1,9 +1,4 @@ defmodule Electric.Postgres.Replication do - use Electric.Satellite.Protobuf - - alias PgQuery, as: Pg - alias Electric.Postgres.{Dialect, Extension.SchemaLoader, Schema.AST, Schema.Proto} - defmodule Column do alias Electric.Postgres @@ -47,168 +42,4 @@ defmodule Electric.Postgres.Replication do columns: [Column.t()] } end - - @type version() :: binary() - - @default_dialect Dialect.SQLite - - # we get a list of sql statements and a schema - # 1. update the schema with the ddl - # 2. generate the sqlite sql from the ast - # 3. get the list of tables involved in the migration - # 4. use the updated schema to get column, fk and pk information for the affected tables - # - # - creation of indexes doesn't affect any tables so that list should be empty - @spec migrate(SchemaLoader.Version.t(), binary(), Electric.Postgres.Dialect.t()) :: - {:ok, [%SatOpMigrate{}], [{binary, binary}]} - def migrate(schema_version, stmt, dialect \\ @default_dialect) do - ast = Electric.Postgres.parse!(stmt) - - case propagatable_stmt?(ast) do - [] -> - {:ok, [], []} - - propagate_ast -> - {msg, relations} = build_replication_msg(propagate_ast, schema_version, dialect) - - {:ok, [msg], relations} - end - end - - def stmt_type(%Pg.CreateStmt{}) do - :CREATE_TABLE - end - - def stmt_type(%Pg.IndexStmt{}) do - :CREATE_INDEX - end - - def stmt_type(%Pg.AlterTableStmt{cmds: [cmd]}) do - case cmd do - %{node: {:alter_table_cmd, %Pg.AlterTableCmd{subtype: :AT_AddColumn}}} -> - :ALTER_ADD_COLUMN - end - end - - def affected_tables(stmts, dialect \\ @default_dialect) when is_list(stmts) do - stmts - |> Enum.flat_map(&get_affected_table/1) - |> Enum.uniq_by(&Dialect.table_name(&1, dialect)) - end - - defp get_affected_table(%Pg.CreateStmt{relation: relation}) do - [AST.map(relation)] - end - - defp get_affected_table(%Pg.AlterTableStmt{relation: relation}) do - [AST.map(relation)] - end - - defp get_affected_table(%Pg.IndexStmt{}) do - [] - end - - defp get_affected_table(_stmt) do - [] - end - - defp build_replication_msg(ast, schema_version, dialect) do - affected_tables = affected_tables(ast, dialect) - - relations = Enum.map(affected_tables, &{&1.schema, &1.name}) - - tables = - affected_tables - |> Enum.map(&SchemaLoader.Version.table!(schema_version, &1)) - |> Enum.map(&replication_msg_table(&1, dialect)) - - table = - case tables do - [] -> nil - [table] -> table - end - - stmts = - Enum.map( - ast, - &%SatOpMigrate.Stmt{ - type: stmt_type(&1), - sql: Dialect.to_sql(&1, dialect) - } - ) - - {%SatOpMigrate{ - version: SchemaLoader.Version.version(schema_version), - table: table, - stmts: stmts - }, relations} - end - - # FIXME: not all ddl commands are suitable for passing to the clients. - # these should be filtered by the event trigger function. in lieu of that - # filter them here - defp propagatable_stmt?(ast) do - Enum.filter(ast, fn - %Pg.CreateStmt{} -> - true - - %Pg.IndexStmt{} -> - true - - %Pg.AlterTableStmt{ - cmds: [%{node: {:alter_table_cmd, %Pg.AlterTableCmd{subtype: :AT_AddColumn}}}] - } -> - true - - _else -> - false - end) - end - - defp replication_msg_table(%Proto.Table{} = table, dialect) do - %SatOpMigrate.Table{ - name: Dialect.table_name(table.name, dialect), - columns: Enum.map(table.columns, &replication_msg_table_col(&1, dialect)), - fks: Enum.flat_map(table.constraints, &replication_msg_table_fk(&1, dialect)), - pks: Enum.flat_map(table.constraints, &replication_msg_table_pk(&1, dialect)) - } - end - - defp replication_msg_table_col(%Proto.Column{} = column, dialect) do - %SatOpMigrate.Column{ - name: column.name, - pg_type: replication_msg_table_col_type(column.type), - sqlite_type: Dialect.type_name(column.type, dialect) - } - end - - defp replication_msg_table_col_type(%Proto.Column.Type{} = type) do - %SatOpMigrate.PgColumnType{ - name: type.name, - array: type.array, - size: type.size - } - end - - defp replication_msg_table_pk(%Proto.Constraint{constraint: {:primary, pk}}, _dialect) do - pk.keys - end - - defp replication_msg_table_pk(_constraint, _dialect) do - [] - end - - defp replication_msg_table_fk(%Proto.Constraint{constraint: {:foreign, fk}}, dialect) do - [ - %SatOpMigrate.ForeignKey{ - fk_cols: fk.fk_cols, - pk_cols: fk.pk_cols, - pk_table: Dialect.table_name(fk.pk_table, dialect) - } - ] - end - - defp replication_msg_table_fk(_constraint, _dialect) do - [] - end end diff --git a/components/electric/lib/electric/replication/changes.ex b/components/electric/lib/electric/replication/changes.ex index 4c48134d6b..dd2da77a7d 100644 --- a/components/electric/lib/electric/replication/changes.ex +++ b/components/electric/lib/electric/replication/changes.ex @@ -26,11 +26,15 @@ defmodule Electric.Replication.Changes do @type tag() :: String.t() @type pk() :: [String.t(), ...] - @type change() :: + @type data_change() :: Changes.NewRecord.t() | Changes.UpdatedRecord.t() | Changes.DeletedRecord.t() + + @type change() :: + data_change() | Changes.UpdatedPermissions.t() + | Changes.Migration.t() defmodule Transaction do alias Electric.Replication.Changes @@ -83,7 +87,8 @@ defmodule Electric.Replication.Changes do deletes: 0, compensations: 0, truncates: 0, - gone: 0 + gone: 0, + migration: 0 } Enum.reduce(changes, base, fn %module{}, acc -> @@ -95,6 +100,7 @@ defmodule Electric.Replication.Changes do Changes.Compensation -> :compensations Changes.TruncatedRelation -> :truncates Changes.Gone -> :gone + Changes.Migration -> :migration end Map.update!(%{acc | operations: acc.operations + 1}, key, &(&1 + 1)) @@ -241,6 +247,48 @@ defmodule Electric.Replication.Changes do | %__MODULE__{type: :global, permissions: GlobalPermissions.t()} end + defmodule Migration do + alias Electric.Postgres.Extension.SchemaLoader + alias Electric.Postgres + + @relation Electric.Postgres.Extension.ddl_relation() + + defmodule Ops do + defstruct sqlite: [], postgres: [] + + @type t() :: %__MODULE__{ + sqlite: [%Electric.Satellite.SatOpMigrate{}], + postgres: [%Electric.Satellite.SatOpMigrate{}] + } + end + + defstruct [ + :version, + :schema, + :ddl, + :ops, + :relations, + # give this message a relation just to make it more compatible with other messages + relation: @relation + ] + + @type t() :: %__MODULE__{ + version: SchemaLoader.version(), + schema: SchemaLoader.Version.t(), + ddl: [String.t(), ...], + ops: Ops.t(), + relations: [Postgres.relation()], + relation: Postgres.relation() + } + + @spec dialects() :: [{atom(), Electric.Postgres.Dialect.t()}] + def dialects do + [ + sqlite: Electric.Postgres.Dialect.SQLite + ] + end + end + @spec filter_changes_belonging_to_user(Transaction.t(), binary()) :: Transaction.t() def filter_changes_belonging_to_user(%Transaction{changes: changes} = tx, user_id) do %{tx | changes: Enum.filter(changes, &Changes.Ownership.change_belongs_to_user?(&1, user_id))} diff --git a/components/electric/lib/electric/replication/changes/ownership.ex b/components/electric/lib/electric/replication/changes/ownership.ex index d584ee6a62..2fde477a03 100644 --- a/components/electric/lib/electric/replication/changes/ownership.ex +++ b/components/electric/lib/electric/replication/changes/ownership.ex @@ -29,6 +29,10 @@ defmodule Electric.Replication.Changes.Ownership do validate_record(record, user_id, owner_column) end + def change_belongs_to_user?(%Changes.Migration{}, _user_id, _owner_column) do + true + end + @empty [nil, ""] @spec validate_record(Changes.record(), Auth.user_id(), Changes.db_identifier()) :: boolean() diff --git a/components/electric/lib/electric/replication/initial_sync.ex b/components/electric/lib/electric/replication/initial_sync.ex index d5779d4ad2..2da044c6b9 100644 --- a/components/electric/lib/electric/replication/initial_sync.ex +++ b/components/electric/lib/electric/replication/initial_sync.ex @@ -9,8 +9,8 @@ defmodule Electric.Replication.InitialSync do alias Electric.Utils alias Electric.Telemetry.Metrics alias Electric.Replication.Shapes - alias Electric.Postgres.{CachedWal, Extension} - alias Electric.Replication.Changes.{NewRecord, Transaction} + alias Electric.Postgres.{CachedWal, Extension, Migration} + alias Electric.Replication.Changes.Transaction alias Electric.Replication.Connectors alias Electric.Replication.Postgres.Client @@ -29,32 +29,9 @@ defmodule Electric.Replication.InitialSync do Transaction.t() ] def migrations_since(version, origin, lsn \\ 0) do - publication = Extension.publication_name() {:ok, migrations} = Extension.SchemaCache.migration_history(origin, version) - for migration <- migrations do - records = - for sql <- migration.stmts do - %NewRecord{ - relation: Extension.ddl_relation(), - record: %{ - "version" => migration.version, - "query" => sql, - "txid" => migration.txid, - "txts" => migration.txts - } - } - end - - %Transaction{ - xid: migration.txid, - changes: records, - commit_timestamp: migration.timestamp, - origin: origin, - publication: publication, - lsn: lsn - } - end + Migration.to_transactions(migrations, origin, lsn) end @doc """ diff --git a/components/electric/lib/electric/replication/postgres/migration_consumer.ex b/components/electric/lib/electric/replication/postgres/migration_consumer.ex index 02a4d2999b..3ac7c48e80 100644 --- a/components/electric/lib/electric/replication/postgres/migration_consumer.ex +++ b/components/electric/lib/electric/replication/postgres/migration_consumer.ex @@ -5,23 +5,23 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do use GenStage import Electric.Postgres.Extension, - only: [is_ddl_relation: 1, is_extension_relation: 1, is_perms_relation: 1] + only: [ + is_ddl_relation: 1, + is_extension_relation: 1, + is_perms_relation: 1 + ] alias Electric.Postgres.{ - Extension, Extension.SchemaLoader, Extension.SchemaCache, - OidDatabase, - Schema + OidDatabase } - alias Electric.Replication.Changes.NewRecord + alias Electric.Replication.Changes.Migration alias Electric.Replication.Connectors alias Electric.Replication.Postgres.Client alias Electric.Satellite.Permissions - alias Electric.Telemetry.Metrics - require Logger @spec name(Connectors.config()) :: Electric.reg_name() @@ -136,64 +136,44 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do {filtered, state} end - defp process_permissions({changes, state}) do - %{permissions: consumer_state, loader: loader} = state - - {:ok, changes, consumer_state, loader} = - Permissions.State.update(changes, consumer_state, loader) - - {changes, %{state | permissions: consumer_state, loader: loader}} - end - - defp process_migrations({changes, %{loader: loader} = state}) do - {:ok, %{version: current_schema_version}} = SchemaLoader.load(loader) - - {state, migration_versions} = - changes - |> transaction_changes_to_migrations(state) - |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) - |> skip_applied_migrations(current_schema_version) - |> Enum.reduce({state, []}, fn migration, {state, versions} -> - {state, schema_version} = perform_migration(migration, state) - {state, [schema_version | versions]} - end) - - case migration_versions do - [] -> - {changes, state} + defp process_migrations({changes, state}) do + {changes, loader} = + Electric.Postgres.Migration.State.update(changes, state.loader, + schema_change_handler: &update_oids_after_migration(&1, &2, state), + skip_applied: true + ) - [schema_version | _] -> - state = + state = + Enum.reduce(changes, %{state | loader: loader}, fn + %Migration{} = migration, state -> + # By pre-emptively updating the permissions with the schema changes, + # we are effectively re-ordering the changes within a tx. I don't think + # this is a problem but it's something to be aware of. state - |> refresh_permissions_consumer(schema_version) + |> refresh_permissions_consumer(migration.schema) |> refresh_subscription() - {changes, state} - end - end + _change, state -> + state + end) - defp transaction_changes_to_migrations(changes, state) do - for %NewRecord{record: record, relation: relation} <- changes, is_ddl_relation(relation) do - {:ok, version} = SchemaLoader.tx_version(state.loader, record) - {:ok, sql} = Extension.extract_ddl_sql(record) - {version, sql} - end + {changes, state} end - defp skip_applied_migrations(migrations, schema_version) do - Enum.drop_while(migrations, fn {version, _stmts} -> version <= schema_version end) - end + defp process_permissions({changes, state}) do + %{permissions: consumer_state, loader: loader} = state - defp perform_migration({version, stmts}, state) do - {:ok, loader, schema_version} = apply_migration(version, stmts, state) + {:ok, changes, consumer_state, loader} = + Permissions.State.update(changes, consumer_state, loader) - Metrics.non_span_event( - [:postgres, :migration], - %{electrified_tables: Schema.num_electrified_tables(schema_version.schema)}, - %{migration_version: version} - ) + {changes, %{state | permissions: consumer_state, loader: loader}} + end - {%{state | loader: loader}, schema_version} + defp update_oids_after_migration(old_schema_version, new_schema_version, state) do + if state.refresh_enum_types && + old_schema_version.schema.enums != new_schema_version.schema.enums do + Client.with_conn(state.conn_opts, fn conn -> OidDatabase.update_oids(conn, [:ENUM]) end) + end end # update the subscription to add any new @@ -241,37 +221,4 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do max_demand: 50 ) end - - @doc """ - Apply a migration, composed of a version and a list of DDL statements, to a schema - using the given implementation of SchemaLoader. - """ - @spec apply_migration(String.t(), [String.t()], map) :: - {:ok, SchemaLoader.t(), SchemaLoader.Version.t()} | {:error, term()} - def apply_migration(version, stmts, %{loader: loader} = state) when is_list(stmts) do - {:ok, %{schema: schema} = schema_version} = SchemaLoader.load(loader) - - Logger.info("Migrating version #{schema_version.version || ""} -> #{version}") - - oid_loader = &SchemaLoader.relation_oid(loader, &1, &2, &3) - - old_enums = schema.enums - - schema = - Enum.reduce(stmts, schema_version.schema, fn stmt, schema -> - Logger.info("Applying migration #{version}: #{inspect(stmt)}") - Schema.update(schema, stmt, oid_loader: oid_loader) - end) - |> Schema.add_shadow_tables(oid_loader: oid_loader) - - Logger.info("Saving schema version #{version} /#{inspect(loader)}/") - - {:ok, loader, schema_version} = SchemaLoader.save(loader, version, schema, stmts) - - if state.refresh_enum_types and schema.enums != old_enums do - Client.with_conn(state.conn_opts, fn conn -> OidDatabase.update_oids(conn, [:ENUM]) end) - end - - {:ok, loader, schema_version} - end end diff --git a/components/electric/lib/electric/satellite/serialization.ex b/components/electric/lib/electric/satellite/serialization.ex index 64410d8624..125476738b 100644 --- a/components/electric/lib/electric/satellite/serialization.ex +++ b/components/electric/lib/electric/satellite/serialization.ex @@ -2,7 +2,6 @@ defmodule Electric.Satellite.Serialization do alias Electric.Satellite.Protocol alias Electric.Satellite.SatOpGone alias Electric.Replication.Changes.Gone - alias Electric.Postgres.Extension.SchemaCache alias Electric.Postgres.{Extension, Replication} alias Electric.Replication.Changes @@ -11,13 +10,14 @@ defmodule Electric.Satellite.Serialization do NewRecord, UpdatedRecord, DeletedRecord, - Compensation + Compensation, + Migration } use Electric.Satellite.Protobuf import Electric.Postgres.Extension, - only: [is_migration_relation: 1, is_ddl_relation: 1, is_extension_relation: 1] + only: [is_extension_relation: 1] import Bitwise @@ -101,58 +101,33 @@ defmodule Electric.Satellite.Serialization do {[%SatOpLog{ops: state.ops}], state.new_relations, state.known_relations} end - defp serialize_change(record, state) when is_migration_relation(record.relation) do - %{ - origin: origin, - schema: schema, - ops: ops, - migration_version: version, - new_relations: new_relations - } = state + defp serialize_change(%Migration{} = migration, state) do + Logger.info("Serializing migration #{inspect(migration.version)}") - state = - case(record) do - ddl when is_ddl_relation(ddl.relation) -> - {:ok, v} = SchemaCache.tx_version(origin, ddl.record) - {:ok, sql} = Extension.extract_ddl_sql(ddl.record) - - Logger.info("Serializing migration #{inspect(v)}: #{inspect(sql)}") - - # unlikely since the extension tables have constraints that prevent this - if version && version != v, - do: raise("Got DDL transaction with differing migration versions") - - {:ok, schema_version} = maybe_load_schema(origin, schema, v) - - {ops, add_relations} = - case Replication.migrate(schema_version, sql) do - {:ok, [op], relations} -> - {[%SatTransOp{op: {:migrate, op}} | ops], relations} - - {:ok, [], []} -> - {ops, []} - end - - known_relations = - Enum.reduce(add_relations, state.known_relations, fn relation, known -> - {_relation_id, _columns, _, known} = load_new_relation(relation, known) - known - end) - - %{ - state - | ops: ops, - migration_version: v, - schema: schema_version, - new_relations: new_relations ++ add_relations, - known_relations: known_relations - } + known_relations = + Enum.reduce(migration.relations, state.known_relations, fn relation, known -> + {_relation_id, _columns, _, known} = + load_new_relation(relation, known) - _ -> - state - end + known + end) + + ops = + migration.ops + |> Map.fetch!(:sqlite) + |> Enum.reduce(state.ops, fn op, ops -> + [%SatTransOp{op: {:migrate, op}} | ops] + end) - %{state | is_migration: true} + %{ + state + | ops: ops, + migration_version: migration.version, + schema: migration.schema, + new_relations: state.new_relations ++ migration.relations, + known_relations: known_relations, + is_migration: true + } end # writes to any table under the electric.* schema shoudn't be passed as DML @@ -183,20 +158,6 @@ defmodule Electric.Satellite.Serialization do %{state | ops: [op | ops], new_relations: new_relations, known_relations: known_relations} end - defp maybe_load_schema(origin, nil, version) do - with {:ok, schema} <- Extension.SchemaCache.load(origin, version) do - {:ok, schema} - else - error -> - Logger.error("#{origin} Unable to load schema version #{version}: #{inspect(error)}") - error - end - end - - defp maybe_load_schema(_origin, schema, _version) do - {:ok, schema} - end - defp mk_trans_op(%NewRecord{record: data, tags: tags}, rel_id, rel_cols, _) do op_insert = %SatOpInsert{ relation_id: rel_id, diff --git a/components/electric/test/electric/postgres/extension/ddl_capture_test.exs b/components/electric/test/electric/postgres/extension/ddl_capture_test.exs index 66a9860d5c..a2e32c6d4e 100644 --- a/components/electric/test/electric/postgres/extension/ddl_capture_test.exs +++ b/components/electric/test/electric/postgres/extension/ddl_capture_test.exs @@ -1,7 +1,6 @@ defmodule Electric.Postgres.Extension.DDLCaptureTest do alias Postgrex.Extension alias Electric.Postgres.MockSchemaLoader - alias Electric.Replication.Postgres.MigrationConsumer use Electric.Extension.Case, async: false, @@ -71,14 +70,16 @@ defmodule Electric.Postgres.Extension.DDLCaptureTest do # this loader instance is used by the proxy injector loader = MockSchemaLoader.agent_id(__MODULE__.Loader) - state = %{loader: loader, refresh_enum_types: false} # we have to setup the loader with knowledge of the electrified table # and the attached index, otherwise (since we're running in a tx via the proxy) # the default schema loader (backed by schemaloader.epgsql) therefore # can't lookup schema information - {:ok, ^loader, _schema} = MigrationConsumer.apply_migration("001", [sql1], state) - {:ok, ^loader, _schema} = MigrationConsumer.apply_migration("002", [sql4], state) + {:ok, ^loader, _schema} = + Electric.Postgres.Migration.State.apply_migration("001", [sql1], loader) + + {:ok, ^loader, _schema} = + Electric.Postgres.Migration.State.apply_migration("002", [sql4], loader) for sql <- [sql1, sql2, sql3, sql4, sql5] do {:ok, _cols, _rows} = :epgsql.squery(conn, sql) diff --git a/components/electric/test/electric/postgres/extension/schema_cache_test.exs b/components/electric/test/electric/postgres/extension/schema_cache_test.exs index 25c5f4132b..d1e1951684 100644 --- a/components/electric/test/electric/postgres/extension/schema_cache_test.exs +++ b/components/electric/test/electric/postgres/extension/schema_cache_test.exs @@ -169,9 +169,12 @@ defmodule Electric.Postgres.Extension.SchemaCacheTest do end defp produce_txs(producer, txs) when is_list(txs) do - MockProducer.produce(producer, txs) + # provide some unique marker that we can wait on + [txn | rest] = txs + xid = System.system_time(:millisecond) + MockProducer.produce(producer, [%{txn | xid: xid} | rest]) - assert_receive {MockConsumer, :events, ^txs}, 1000 + assert_receive {MockConsumer, :events, [%{xid: ^xid} | _rest]}, 1000 end defp migration_transaction(conn, version, stmts) do diff --git a/components/electric/test/electric/postgres/replication_test.exs b/components/electric/test/electric/postgres/migration_test.exs similarity index 90% rename from components/electric/test/electric/postgres/replication_test.exs rename to components/electric/test/electric/postgres/migration_test.exs index a2c0de2bb1..a1ba4bd3ed 100644 --- a/components/electric/test/electric/postgres/replication_test.exs +++ b/components/electric/test/electric/postgres/migration_test.exs @@ -1,9 +1,9 @@ -defmodule Electric.Postgres.ReplicationTest do +defmodule Electric.Postgres.MigrationTest do use ExUnit.Case, async: true use Electric.Satellite.Protobuf - alias Electric.Postgres.{Replication, Schema, Extension.SchemaLoader} + alias Electric.Postgres.{Migration, Schema, Extension.SchemaLoader} def parse(sql) do Electric.Postgres.parse!(sql) @@ -18,7 +18,7 @@ defmodule Electric.Postgres.ReplicationTest do for {sql, expected_type} <- stmts do [ast] = parse(sql) - assert Replication.stmt_type(ast) == expected_type + assert Migration.stmt_type(ast) == expected_type end end @@ -40,7 +40,7 @@ defmodule Electric.Postgres.ReplicationTest do create table teeth.front (id int8 primary key); """ |> parse() - |> Replication.affected_tables() + |> Migration.affected_tables() |> assert_table_list([{"public", "fish"}, {"public", "frog"}, {"teeth", "front"}]) end @@ -51,7 +51,7 @@ defmodule Electric.Postgres.ReplicationTest do alter table teeth.front alter column id drop default; """ |> parse() - |> Replication.affected_tables() + |> Migration.affected_tables() |> assert_table_list([{"public", "fish"}, {"public", "frog"}, {"teeth", "front"}]) end @@ -62,7 +62,7 @@ defmodule Electric.Postgres.ReplicationTest do alter table teeth.front alter column id drop default; """ |> parse() - |> Replication.affected_tables() + |> Migration.affected_tables() |> assert_table_list([{"public", "fish"}, {"public", "frog"}, {"teeth", "front"}]) end @@ -72,7 +72,7 @@ defmodule Electric.Postgres.ReplicationTest do alter table fish alter column id drop default; """ |> parse() - |> Replication.affected_tables() + |> Migration.affected_tables() |> assert_table_list([{"public", "fish"}]) end @@ -82,7 +82,7 @@ defmodule Electric.Postgres.ReplicationTest do create index on frog (id asc); """ |> parse() - |> Replication.affected_tables() + |> Migration.affected_tables() |> assert_table_list([]) end @@ -102,7 +102,7 @@ defmodule Electric.Postgres.ReplicationTest do for stmt <- stmts do stmt |> parse() - |> Replication.affected_tables() + |> Migration.affected_tables() |> assert_table_list([]) end end @@ -124,7 +124,7 @@ defmodule Electric.Postgres.ReplicationTest do version = "20230405134615" schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [msg], [{"public", "fish"}]} = Replication.migrate(schema_version, stmt) + assert {:ok, [msg], [{"public", "fish"}]} = Migration.to_op(stmt, schema_version) # there are lots of tests that validate the schema is being properly updated # assert Schema.table_names(schema) == [~s("public"."fish"), ~s("frog"), ~s("teeth"."front")] @@ -165,7 +165,7 @@ defmodule Electric.Postgres.ReplicationTest do schema = schema_update(schema, stmt) schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [msg], [{"teeth", "front"}]} = Replication.migrate(schema_version, stmt) + assert {:ok, [msg], [{"teeth", "front"}]} = Migration.to_op(stmt, schema_version) assert Schema.table_names(schema) == [~s("public"."fish"), ~s("teeth"."front")] assert %SatOpMigrate{version: ^version} = msg %{stmts: stmts, table: table} = msg @@ -210,7 +210,7 @@ defmodule Electric.Postgres.ReplicationTest do version = "20230405134615" schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [_msg], [{"public", "fish"}]} = Replication.migrate(schema_version, stmt) + assert {:ok, [_msg], [{"public", "fish"}]} = Migration.to_op(stmt, schema_version) # there are lots of tests that validate the schema is being properly updated assert Schema.table_names(schema) == [~s("public"."fish")] @@ -221,7 +221,7 @@ defmodule Electric.Postgres.ReplicationTest do schema = schema_update(schema, stmt) schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [msg], [{"public", "fish"}]} = Replication.migrate(schema_version, stmt) + assert {:ok, [msg], [{"public", "fish"}]} = Migration.to_op(stmt, schema_version) assert %SatOpMigrate{version: ^version} = msg @@ -269,14 +269,14 @@ defmodule Electric.Postgres.ReplicationTest do version = "20230405134615" schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [_msg], [{"public", "fish"}]} = Replication.migrate(schema_version, stmt) + assert {:ok, [_msg], [{"public", "fish"}]} = Migration.to_op(stmt, schema_version) stmt = "CREATE INDEX fish_available_index ON public.fish (avilable);" schema = schema_update(schema, stmt) version = "20230405134616" schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [msg], []} = Replication.migrate(schema_version, stmt) + assert {:ok, [msg], []} = Migration.to_op(stmt, schema_version) assert %SatOpMigrate{version: ^version} = msg %{stmts: stmts, table: table} = msg @@ -309,7 +309,7 @@ defmodule Electric.Postgres.ReplicationTest do schema_version = SchemaLoader.Version.new(version, schema) for stmt <- stmts do - assert {:ok, [], []} = Replication.migrate(schema_version, stmt) + assert {:ok, [], []} = Migration.to_op(stmt, schema_version) end end @@ -331,7 +331,7 @@ defmodule Electric.Postgres.ReplicationTest do # version = "20230405134615" # schema_version = SchemaLoader.Version.new(version, schema) - # assert {:error, schema} = Replication.migrate(schema_version, stmts) + # assert {:error, schema} = Migration.to_op(stmts, schema_version) # end end end diff --git a/components/electric/test/electric/replication/electrification_test.exs b/components/electric/test/electric/replication/electrification_test.exs index 51b6644b71..250ff2afbb 100644 --- a/components/electric/test/electric/replication/electrification_test.exs +++ b/components/electric/test/electric/replication/electrification_test.exs @@ -4,7 +4,7 @@ defmodule Electric.Replication.ElectrificationTest do import Electric.Postgres.TestConnection alias Electric.Postgres.CachedWal - alias Electric.Replication.Changes.{NewRecord, Transaction} + alias Electric.Replication.Changes.{Migration, Transaction} @origin "electrification-test" @sleep_timeout 5000 @@ -23,12 +23,11 @@ defmodule Electric.Replication.ElectrificationTest do assert {:ok, lsn, %Transaction{ changes: [ - %NewRecord{ + %Migration{ relation: {"electric", "ddl_commands"}, - record: %{ - "query" => - "CREATE TABLE foo (\n id text NOT NULL,\n CONSTRAINT foo_pkey PRIMARY KEY (id)\n);\n\n\n" - } + ddl: [ + "CREATE TABLE foo (\n id text NOT NULL,\n CONSTRAINT foo_pkey PRIMARY KEY (id)\n);\n\n\n" + ] } ], origin: @origin @@ -58,12 +57,11 @@ defmodule Electric.Replication.ElectrificationTest do assert {:ok, lsn, %Transaction{ changes: [ - %NewRecord{ + %Migration{ relation: {"electric", "ddl_commands"}, - record: %{ - "query" => - "CREATE TABLE foo (\n id text NOT NULL,\n CONSTRAINT foo_pkey PRIMARY KEY (id)\n);\n\n\n" - } + ddl: [ + "CREATE TABLE foo (\n id text NOT NULL,\n CONSTRAINT foo_pkey PRIMARY KEY (id)\n);\n\n\n" + ] } ], origin: @origin diff --git a/components/electric/test/electric/replication/initial_sync_test.exs b/components/electric/test/electric/replication/initial_sync_test.exs index a079ce08b9..d921b7ce87 100644 --- a/components/electric/test/electric/replication/initial_sync_test.exs +++ b/components/electric/test/electric/replication/initial_sync_test.exs @@ -3,8 +3,8 @@ defmodule Electric.Replication.InitialSyncTest do import Electric.Postgres.TestConnection - alias Electric.Postgres.{CachedWal, Extension, Lsn} - alias Electric.Replication.Changes.{NewRecord, Transaction} + alias Electric.Postgres.{CachedWal, Lsn} + alias Electric.Replication.Changes.{Migration, Transaction} alias Electric.Replication.InitialSync alias Electric.Replication.Postgres.Client @@ -48,15 +48,9 @@ defmodule Electric.Replication.InitialSyncTest do assert is_integer(xid) assert %DateTime{} = timestamp - migration_relation = Extension.ddl_relation() - - assert %NewRecord{ - relation: ^migration_relation, - record: %{ - "query" => "CREATE TABLE users" <> _, - "version" => ^version_1 - }, - tags: [] + assert %Migration{ + version: ^version_1, + ddl: ["CREATE TABLE users" <> _] } = migration :ok = electrify_table(conn, "public.documents", version_2) @@ -89,28 +83,18 @@ defmodule Electric.Replication.InitialSyncTest do assert %DateTime{} = timestamp1 assert %DateTime{} = timestamp2 - migration1_version = Map.fetch!(migration1.record, "version") - migration2_version = Map.fetch!(migration2.record, "version") + migration1_version = migration1.version + migration2_version = migration2.version assert migration1_version < migration2_version - migration_relation = Extension.ddl_relation() - assert [ - %NewRecord{ - relation: ^migration_relation, - record: %{ - "query" => "CREATE TABLE users" <> _, - "version" => ^migration1_version - }, - tags: [] + %Migration{ + ddl: ["CREATE TABLE users" <> _], + version: ^migration1_version }, - %NewRecord{ - relation: ^migration_relation, - record: %{ - "query" => "CREATE TABLE documents" <> _, - "version" => ^migration2_version - }, - tags: [] + %Migration{ + ddl: ["CREATE TABLE documents" <> _], + version: ^migration2_version } ] = [migration1, migration2] end diff --git a/components/electric/test/electric/replication/postgres/migration_consumer_test.exs b/components/electric/test/electric/replication/postgres/migration_consumer_test.exs index dad3de4c8a..c19dd1b322 100644 --- a/components/electric/test/electric/replication/postgres/migration_consumer_test.exs +++ b/components/electric/test/electric/replication/postgres/migration_consumer_test.exs @@ -5,6 +5,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do alias Electric.Replication.Changes alias Electric.Replication.Changes.NewRecord alias Electric.Replication.Changes.Transaction + alias Electric.Replication.Changes.Migration alias Electric.Replication.Postgres.MigrationConsumer @receive_timeout 500 @@ -211,7 +212,33 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do GenStage.call(producer, {:emit, cxt.loader, events, version}) - assert_receive {FakeConsumer, :events, ^events}, @receive_timeout + assert_receive {FakeConsumer, :events, events}, @receive_timeout + + assert [ + %Transaction{ + changes: [ + %Migration{ + version: "20220421", + schema: %{version: "20220421"}, + ddl: [ + "create table something_else (id uuid primary key);", + "create table other_thing (id uuid primary key);", + "create table yet_another_thing (id uuid primary key);" + ], + relations: [ + {"public", "other_thing"}, + {"public", "something_else"}, + {"public", "yet_another_thing"} + ] + } + ], + commit_timestamp: ~U[2023-05-02 10:08:00.948788Z], + origin: ^origin, + publication: "mock_pub", + origin_type: :postgresql + } + ] = events + assert_receive {MockSchemaLoader, :load}, @receive_timeout # only 1 save instruction is observed assert_receive {MockSchemaLoader, {:save, ^version, schema, [_, _, _]}}, @receive_timeout @@ -346,30 +373,29 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do } ] - filtered_events = [ - %Transaction{ - changes: [ - %NewRecord{ - relation: {"electric", "ddl_commands"}, - record: %{ - "id" => "6", - "query" => "create table something_else (id uuid primary key);", - "txid" => "101", - "txts" => "201" - }, - tags: [] - } - ], - commit_timestamp: ~U[2023-05-02 10:08:00.948788Z], - origin: origin, - publication: "mock_pub", - origin_type: :postgresql - } - ] - GenStage.call(producer, {:emit, cxt.loader, raw_events, version}) - assert_receive {FakeConsumer, :events, ^filtered_events}, 1000 + assert_receive {FakeConsumer, :events, filtered_events}, 1000 + + assert [ + %Transaction{ + changes: [ + %Migration{ + version: "20220421", + ddl: [ + "create table something_else (id uuid primary key);" + ], + relations: [{"public", "something_else"}], + relation: {"electric", "ddl_commands"} + } + ], + commit_timestamp: ~U[2023-05-02 10:08:00.948788Z], + origin: ^origin, + publication: "mock_pub", + origin_type: :postgresql + } + ] = filtered_events + assert_receive {MockSchemaLoader, :load}, 500 assert_receive {MockSchemaLoader, @@ -573,30 +599,20 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do assert [ %Transaction{ changes: [ - %NewRecord{ - relation: {"electric", "ddl_commands"}, - record: %{ - "id" => "6", - "query" => "create table teams (id uuid primary key);", - "txid" => "101", - "txts" => "201" - } - }, - %NewRecord{ - relation: {"electric", "ddl_commands"}, - record: %{ - "id" => "7", - "query" => """ + %Migration{ + version: "20220421", + ddl: [ + "create table teams (id uuid primary key);", + """ create table team_memberships ( id uuid primary key, team_id uuid references teams (id), user_id uuid references users (id), team_role text not null ); - """, - "txid" => "101", - "txts" => "201" - } + """ + ], + schema: %{version: "20220421"} }, %Changes.UpdatedPermissions{ type: :global, diff --git a/components/electric/test/electric/satellite/serialization_test.exs b/components/electric/test/electric/satellite/serialization_test.exs index baa6ba2a6d..ace6797c6c 100644 --- a/components/electric/test/electric/satellite/serialization_test.exs +++ b/components/electric/test/electric/satellite/serialization_test.exs @@ -396,54 +396,69 @@ defmodule Electric.Satellite.SerializationTest do test "writes to electric ddl table are recognised as migration ops", cxt do version = "20220421" - tx = %Transaction{ - changes: [ - %Electric.Replication.Changes.UpdatedRecord{ - relation: {"electric", "ddl_commands"}, - old_record: nil, - record: %{ - "id" => "6", - "query" => "create table something_else (id uuid primary key);", - "txid" => "100", - "txts" => "200" - }, - tags: ["postgres_1@1682019749178"] + changes = [ + %Electric.Replication.Changes.NewRecord{ + relation: {"electric", "ddl_commands"}, + record: %{ + "id" => "6", + "query" => "create table something_else (id uuid primary key);", + "txid" => "100", + "txts" => "200" }, - %Electric.Replication.Changes.UpdatedRecord{ - relation: {"electric", "ddl_commands"}, - old_record: nil, - record: %{ - "id" => "7", - "query" => "create table other_thing (id uuid primary key);", - "txid" => "100", - "txts" => "200" - }, - tags: ["postgres_1@1682019749178"] + tags: ["postgres_1@1682019749178"] + }, + %Electric.Replication.Changes.NewRecord{ + relation: {"electric", "ddl_commands"}, + record: %{ + "id" => "7", + "query" => "create table other_thing (id uuid primary key);", + "txid" => "100", + "txts" => "200" }, - %Electric.Replication.Changes.UpdatedRecord{ - relation: {"electric", "ddl_commands"}, - old_record: nil, - record: %{ - "id" => "8", - "query" => "create table yet_another_thing (id uuid primary key);", - "txid" => "100", - "txts" => "200" - }, - tags: ["postgres_1@1682019749178"] - } - ], - commit_timestamp: ~U[2023-04-20 14:05:31.416063Z], - origin: cxt.origin, - publication: "all_tables", - lsn: %Lsn{segment: 0, offset: 0}, - origin_type: :postgresql - } + tags: ["postgres_1@1682019749178"] + }, + %Electric.Replication.Changes.NewRecord{ + relation: {"electric", "ddl_commands"}, + record: %{ + "id" => "8", + "query" => "create table yet_another_thing (id uuid primary key);", + "txid" => "100", + "txts" => "200" + }, + tags: ["postgres_1@1682019749178"] + } + ] + + tx = + %Transaction{ + changes: changes, + commit_timestamp: ~U[2023-04-20 14:05:31.416063Z], + origin: cxt.origin, + publication: "all_tables", + lsn: %Lsn{segment: 0, offset: 0}, + origin_type: :postgresql + } migrate_schema(tx, version, cxt) - {oplog, - [{"public", "something_else"}, {"public", "other_thing"}, {"public", "yet_another_thing"}], - %{}} = Serialization.serialize_trans(tx, 1, %{}) + # The migration consumer replaces the series of NewRecords against the ddl table + # into a single migration message + tx = + %Transaction{ + changes: Electric.Postgres.Migration.State.convert(changes, cxt.loader), + commit_timestamp: ~U[2023-04-20 14:05:31.416063Z], + origin: cxt.origin, + publication: "all_tables", + lsn: %Lsn{segment: 0, offset: 0}, + origin_type: :postgresql + } + + assert {oplog, + [ + {"public", "other_thing"}, + {"public", "something_else"}, + {"public", "yet_another_thing"} + ], %{}} = Serialization.serialize_trans(tx, 1, %{}) assert [%SatOpLog{ops: ops}] = oplog diff --git a/components/electric/test/support/postgres_test_connection.ex b/components/electric/test/support/postgres_test_connection.ex index 666805ef4b..627e58850e 100644 --- a/components/electric/test/support/postgres_test_connection.ex +++ b/components/electric/test/support/postgres_test_connection.ex @@ -3,7 +3,6 @@ defmodule Electric.Postgres.TestConnection do import ExUnit.Assertions alias Electric.Replication.{Postgres.Client, PostgresConnector, PostgresConnectorMng} - alias Electric.Postgres.Extension require Electric.Postgres.Extension @@ -213,9 +212,24 @@ defmodule Electric.Postgres.TestConnection do & &1 ) |> Stream.reject(&(&1.changes == [])) - |> Stream.take(10) - |> Enum.find(&Enum.all?(&1.changes, fn x -> Extension.is_ddl_relation(x.relation) end)) || - flunk("Migration statements didn't show up in the cached WAL") + |> Enum.reduce_while(10, fn + _tx, 0 -> + {:halt, :error} + + %{changes: changes}, n -> + if Enum.any?(changes, &is_struct(&1, Electric.Replication.Changes.Migration)) do + {:halt, :ok} + else + {:cont, n - 1} + end + end) + |> case do + :ok -> + :ok + + :error -> + flunk("Migration statements didn't show up in the cached WAL") + end [electrified_count: electrified_count] end diff --git a/e2e/tests/01.05_electric_can_recreate_publication.lux b/e2e/tests/01.05_electric_can_recreate_publication.lux index e42fe3290b..3cbba828ec 100644 --- a/e2e/tests/01.05_electric_can_recreate_publication.lux +++ b/e2e/tests/01.05_electric_can_recreate_publication.lux @@ -36,9 +36,7 @@ # Make sure Electric consumes all migrations from the replication stream before stopping it. [shell electric] ?component=CachedWal.EtsBacked origin=postgres_1 \[debug\] Saving transaction\ - .+ with changes \[%Electric.Replication.Changes.NewRecord\{\ - relation: \{"electric", "ddl_commands"\}, \ - record: %\{.*"query" => "CREATE TABLE baz + .+ with changes \[%Electric.Replication.Changes.Migration\{version: "003" [shell log] [invoke stop_electric 1] diff --git a/e2e/tests/02.03_partial_replication_based_on_user_id.lux b/e2e/tests/02.03_partial_replication_based_on_user_id.lux index f3f4eb6c2a..e06db74aa7 100644 --- a/e2e/tests/02.03_partial_replication_based_on_user_id.lux +++ b/e2e/tests/02.03_partial_replication_based_on_user_id.lux @@ -26,12 +26,14 @@ ?$psql [shell electric] - # We expect to send the transaction to both satellites + # migration is sent for both clients + ?+client_id=client_1_1 .+ user_id=1 \[debug\] trans:(.*)%Electric.Replication.Changes.Migration + ?client_id=client_2_1 .+ user_id=2 \[debug\] trans:(.*)%Electric.Replication.Changes.Migration + # the insert is only sent to user_id=1 ?client_id=client_1_1 .+ user_id=1 \[debug\] trans:(.*)%Electric.Replication.Changes.NewRecord - ?client_id=client_2_1 .+ user_id=2 \[debug\] trans:(.*)%Electric.Replication.Changes.NewRecord [shell user_1_ws1] - # And recieve it on Satellite 1. Assertion for the row missing on Satellite 2 is above + # And receive it on Satellite 1. Assertion for the row missing on Satellite 2 is above ?rec \[\d+\]: %Electric.Satellite.SatOpLog\{(.*)row_data: %Electric\.Satellite\.SatOpRow\{ ?values: \["00000000-0000-0000-0000-000000000000", "1", "sentinel value"\] ?\} diff --git a/e2e/tests/06.02_permissions_change_propagation.lux b/e2e/tests/06.02_permissions_change_propagation.lux index 172078921f..ceb25bad39 100644 --- a/e2e/tests/06.02_permissions_change_propagation.lux +++ b/e2e/tests/06.02_permissions_change_propagation.lux @@ -7,7 +7,7 @@ [global migration_version_2=20240226114300] [global user_id1=95f21e62-4b90-49c3-874a-174eb17e58cf] [global user_id2=31377df9-c659-493e-b26f-1ce5fbb0b2df] -[global session_id=004d3e42-d072-4a60-9513-93ddd843d478] +[global session_id=001] [global project_id=99adf0a5-b3c6-45d7-9986-582e76db4556]