Skip to content

Commit

Permalink
Merge branch 'master' into PLAT-191_funnel_tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
dpanina committed Nov 8, 2022
2 parents 068bfd3 + 1b6f1e1 commit 00a900d
Show file tree
Hide file tree
Showing 46 changed files with 2,216 additions and 544 deletions.
7 changes: 5 additions & 2 deletions src/data_processor/data_processor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import annotations

import typing
import uuid
from typing import Any

from src.data_processor.registry import register_dataprocessor
from src.eventstream.eventstream import Eventstream
from src.params_model import ParamsModel

if typing.TYPE_CHECKING:
from src.eventstream.types import EventstreamType


class DataProcessor:
params: ParamsModel
Expand All @@ -28,7 +31,7 @@ def __call__(self, params: ParamsModel) -> DataProcessor:
DataProcessor.__init__(self, params=params)
return self

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
raise NotImplementedError

def export(self) -> dict[str, Any]:
Expand Down
6 changes: 4 additions & 2 deletions src/data_processors_lib/rete/collapse_loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas as pd

from src.data_processor.data_processor import DataProcessor
from src.eventstream.eventstream import Eventstream
from src.eventstream.types import EventstreamType
from src.params_model import ParamsModel


Expand Down Expand Up @@ -67,7 +67,9 @@ class CollapseLoops(DataProcessor):
def __init__(self, params: CollapseLoopsParams):
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

user_col = eventstream.schema.user_id
time_col = eventstream.schema.event_timestamp
type_col = eventstream.schema.event_type
Expand Down
6 changes: 4 additions & 2 deletions src/data_processors_lib/rete/delete_users_by_path_length.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from src.data_processor.data_processor import DataProcessor
from src.data_processors_lib.rete.constants import DATETIME_UNITS
from src.eventstream.eventstream import Eventstream
from src.eventstream.types import EventstreamType
from src.params_model import ParamsModel
from src.widget.widgets import ReteTimeWidget

Expand Down Expand Up @@ -52,7 +52,9 @@ class DeleteUsersByPathLength(DataProcessor):
def __init__(self, params: DeleteUsersByPathLengthParams):
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

user_col = eventstream.schema.user_id
time_col = eventstream.schema.event_timestamp

Expand Down
11 changes: 7 additions & 4 deletions src/data_processors_lib/rete/filter_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from pandas import DataFrame

from src.data_processor.data_processor import DataProcessor
from src.eventstream.eventstream import Eventstream
from src.eventstream.schema import EventstreamSchema
from src.eventstream.types import EventstreamSchemaType, EventstreamType
from src.params_model import ParamsModel


Expand Down Expand Up @@ -41,8 +41,10 @@ class FilterEvents(DataProcessor):
def __init__(self, params: FilterEventsParams):
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
filter_: Callable[[DataFrame, EventstreamSchema], bool] = self.params.filter # type: ignore
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

filter_: Callable[[DataFrame, EventstreamSchemaType], bool] = self.params.filter # type: ignore
events: pd.DataFrame = eventstream.to_dataframe()
mask = filter_(events, eventstream.schema)
events_to_delete = events[~mask]
Expand All @@ -55,6 +57,7 @@ def apply(self, eventstream: Eventstream) -> Eventstream:
raw_data=events_to_delete,
relations=[{"raw_col": "ref", "eventstream": eventstream}],
)
eventstream.soft_delete(eventstream.to_dataframe())
if not events_to_delete.empty:
eventstream.soft_delete(events=eventstream.to_dataframe())

return eventstream
9 changes: 5 additions & 4 deletions src/data_processors_lib/rete/group_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import pandas as pd

from src.data_processor.data_processor import DataProcessor
from src.eventstream.eventstream import Eventstream
from src.eventstream.schema import EventstreamSchema
from src.eventstream.types import EventstreamSchemaType, EventstreamType
from src.params_model import ParamsModel

EventstreamFilter = Callable[[pd.DataFrame, EventstreamSchema], Any]
EventstreamFilter = Callable[[pd.DataFrame, EventstreamSchemaType], Any]


class GroupEventsParams(ParamsModel):
Expand Down Expand Up @@ -63,7 +62,9 @@ class GroupEvents(DataProcessor):
def __init__(self, params: GroupEventsParams) -> None:
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

event_name = self.params.event_name
filter_: Callable = self.params.filter
event_type = self.params.event_type
Expand Down
6 changes: 4 additions & 2 deletions src/data_processors_lib/rete/lost_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from src.data_processor.data_processor import DataProcessor
from src.data_processors_lib.rete.constants import DATETIME_UNITS
from src.eventstream.eventstream import Eventstream
from src.eventstream.types import EventstreamType
from src.params_model import ParamsModel
from src.widget.widgets import ListOfInt, ReteTimeWidget

Expand Down Expand Up @@ -70,7 +70,9 @@ class LostUsersEvents(DataProcessor):
def __init__(self, params: LostUsersParams):
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

user_col = eventstream.schema.user_id
time_col = eventstream.schema.event_timestamp
type_col = eventstream.schema.event_type
Expand Down
13 changes: 9 additions & 4 deletions src/data_processors_lib/rete/negative_target.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from __future__ import annotations

from typing import Callable, List
from typing import Any, Callable, List

import pandas as pd

from src.data_processor.data_processor import DataProcessor
from src.eventstream.eventstream import Eventstream
from src.eventstream.schema import EventstreamSchema
from src.eventstream.types import EventstreamType
from src.params_model import ParamsModel
from src.widget.widgets import ListOfString, ReteFunction

EventstreamFilter = Callable[[pd.DataFrame, EventstreamSchema], Any]

def _default_func_negative(eventstream: Eventstream, negative_target_events: List[str]) -> pd.DataFrame:

def _default_func_negative(eventstream: EventstreamType, negative_target_events: List[str]) -> pd.DataFrame:
"""
Filters rows with target events from the input eventstream.
Expand Down Expand Up @@ -84,7 +87,9 @@ class NegativeTarget(DataProcessor):
def __init__(self, params: NegativeTargetParams):
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

type_col = eventstream.schema.event_type
event_col = eventstream.schema.event_name

Expand Down
6 changes: 4 additions & 2 deletions src/data_processors_lib/rete/new_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pandas import DataFrame

from src.data_processor.data_processor import DataProcessor
from src.eventstream.eventstream import Eventstream
from src.eventstream.types import EventstreamType
from src.params_model import ParamsModel
from src.widget.widgets import ListOfIntNewUsers

Expand Down Expand Up @@ -56,7 +56,9 @@ class NewUsersEvents(DataProcessor):
def __init__(self, params: NewUsersParams):
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

events: DataFrame = eventstream.to_dataframe(copy=True)
user_col = eventstream.schema.user_id
time_col = eventstream.schema.event_timestamp
Expand Down
10 changes: 6 additions & 4 deletions src/data_processors_lib/rete/positive_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import pandas as pd

from src.data_processor.data_processor import DataProcessor
from src.eventstream.eventstream import Eventstream
from src.eventstream.types import EventstreamType
from src.params_model import ParamsModel
from src.widget.widgets import ListOfString, ReteFunction


def _default_func_positive(eventstream: Eventstream, positive_target_events: List[str]) -> pd.DataFrame:
def _default_func_positive(eventstream: EventstreamType, positive_target_events: list[str]) -> pd.DataFrame:
"""
Filters rows with target events from the input eventstream.
Expand Down Expand Up @@ -84,11 +84,13 @@ class PositiveTarget(DataProcessor):
def __init__(self, params: PositiveTargetParams):
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

type_col = eventstream.schema.event_type
event_col = eventstream.schema.event_name

positive_function: Callable[[Eventstream, list[str]], pd.DataFrame] = self.params.positive_function
positive_function: Callable[[EventstreamType, list[str]], pd.DataFrame] = self.params.positive_function
positive_target_events = self.params.positive_target_events

positive_targets = positive_function(eventstream, positive_target_events)
Expand Down
6 changes: 4 additions & 2 deletions src/data_processors_lib/rete/split_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

from src.data_processor.data_processor import DataProcessor
from src.data_processors_lib.rete.constants import DATETIME_UNITS
from src.eventstream.eventstream import Eventstream
from src.eventstream.schema import EventstreamSchema
from src.eventstream.types import EventstreamType
from src.params_model import ParamsModel
from src.widget.widgets import ReteTimeWidget

Expand Down Expand Up @@ -103,7 +103,9 @@ class SplitSessions(DataProcessor):
def __init__(self, params: SplitSessionsParams) -> None:
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

user_col = eventstream.schema.user_id
time_col = eventstream.schema.event_timestamp
type_col = eventstream.schema.event_type
Expand Down
6 changes: 4 additions & 2 deletions src/data_processors_lib/rete/start_end_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pandas import DataFrame

from src.data_processor.data_processor import DataProcessor
from src.eventstream.eventstream import Eventstream
from src.eventstream.types import EventstreamType
from src.params_model import ParamsModel


Expand Down Expand Up @@ -47,7 +47,9 @@ class StartEndEvents(DataProcessor):
def __init__(self, params: StartEndEventsParams) -> None:
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

events: DataFrame = eventstream.to_dataframe(copy=True)
user_col = eventstream.schema.user_id
time_col = eventstream.schema.event_timestamp
Expand Down
6 changes: 4 additions & 2 deletions src/data_processors_lib/rete/truncate_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any, Literal, Optional

from src.data_processor.data_processor import DataProcessor
from src.eventstream.eventstream import Eventstream
from src.eventstream.types import EventstreamType
from src.params_model import ParamsModel


Expand Down Expand Up @@ -56,7 +56,9 @@ class TruncatePath(DataProcessor):
def __init__(self, params: TruncatePathParams):
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

user_col = eventstream.schema.user_id
time_col = eventstream.schema.event_timestamp
event_col = eventstream.schema.event_name
Expand Down
6 changes: 4 additions & 2 deletions src/data_processors_lib/rete/truncated_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from src.data_processor.data_processor import DataProcessor
from src.data_processors_lib.rete.constants import DATETIME_UNITS
from src.eventstream.eventstream import Eventstream
from src.eventstream.types import EventstreamType
from src.params_model import ParamsModel
from src.widget.widgets import ReteTimeWidget

Expand Down Expand Up @@ -75,7 +75,9 @@ class TruncatedEvents(DataProcessor):
def __init__(self, params: TruncatedEventsParams):
super().__init__(params=params)

def apply(self, eventstream: Eventstream) -> Eventstream:
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from src.eventstream.eventstream import Eventstream

events: DataFrame = eventstream.to_dataframe(copy=True)
user_col = eventstream.schema.user_id
time_col = eventstream.schema.event_timestamp
Expand Down
37 changes: 32 additions & 5 deletions src/eventstream/eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,25 @@
import plotly.graph_objects as go

from src.eventstream.schema import EventstreamSchema, RawDataSchema
from src.eventstream.types import EventstreamType, Relation
from src.eventstream.types import EventstreamType, RawDataSchemaType, Relation
from src.tooling.funnel import Funnel
from src.utils import get_merged_col
from src.utils.list import find_index

from .helpers import NewUsersHelperMixin, StartEndHelperMixin
from .helpers import (
CollapseLoopsHelperMixin,
DeleteUsersByPathLengthHelperMixin,
FilterHelperMixin,
GroupHelperMixin,
LostUsersHelperMixin,
NegativeTargetHelperMixin,
NewUsersHelperMixin,
PositiveTargetHelperMixin,
SplitSessionsHelperMixin,
StartEndHelperMixin,
TruncatedEventsHelperMixin,
TruncatePathHelperMixin,
)

IndexOrder = List[Optional[str]]

Expand Down Expand Up @@ -51,16 +64,30 @@
# TODO проработать резервирование колонок


class Eventstream(StartEndHelperMixin, NewUsersHelperMixin, EventstreamType):
class Eventstream(
CollapseLoopsHelperMixin,
DeleteUsersByPathLengthHelperMixin,
FilterHelperMixin,
GroupHelperMixin,
LostUsersHelperMixin,
NegativeTargetHelperMixin,
NewUsersHelperMixin,
PositiveTargetHelperMixin,
SplitSessionsHelperMixin,
StartEndHelperMixin,
TruncatedEventsHelperMixin,
TruncatePathHelperMixin,
EventstreamType,
):
schema: EventstreamSchema
index_order: IndexOrder
relations: List[Relation]
__raw_data_schema: RawDataSchema
__raw_data_schema: RawDataSchemaType
__events: pd.DataFrame | pd.Series[Any]

def __init__(
self,
raw_data_schema: RawDataSchema,
raw_data_schema: RawDataSchemaType,
raw_data: pd.DataFrame | pd.Series[Any],
schema: EventstreamSchema | None = None,
prepare: bool = True,
Expand Down
10 changes: 10 additions & 0 deletions src/eventstream/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
from .collapse_loops_helper import CollapseLoopsHelperMixin
from .delete_users_by_path_length_helper import DeleteUsersByPathLengthHelperMixin
from .filter_helper import FilterHelperMixin
from .group_helper import GroupHelperMixin
from .lost_users_helper import LostUsersHelperMixin
from .negative_target import NegativeTargetHelperMixin
from .new_users_helper import NewUsersHelperMixin
from .positive_target import PositiveTargetHelperMixin
from .split_session_helper import SplitSessionsHelperMixin
from .start_end_helper import StartEndHelperMixin
from .truncate_events_helper import TruncatedEventsHelperMixin
from .truncate_path_helper import TruncatePathHelperMixin
Loading

0 comments on commit 00a900d

Please sign in to comment.