-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Is it possible to use event time/syntethic time rather than system time? #465
Comments
You could always have a sink that saves events along with a timestamp. You can also pass events into a stream by reading from a file, or any function that repeatedly calls emit(). However, if you have all the events in hand, you don't actually need any stream processing, right? If the purpose is to exactly simulate how the streamz pipeline will react, you could make a source which uses the event loop call_at method to emit events, or some simple polling - but it seems like extra and potentially fragile work! |
Yes, the goal is to backtest/simulate existing stream as close to real scenario as possible, using recorded historical data. Another point is that a lot of my offline processing logic can't be vectorized, so I need to use iterative aproach (i.e iterating through dataframe's rows) in both offline and online cases, so streamz come in handy here, giving declarative consistency for both online and offline processing logic (yes, I'm aware of perf decrease due to iterating Pandas dataframe's rows).
Can you please give a code example on this? Let's say I have a list (or a dataframe) of events, each having |
I want to use streamz for offline processing on historical data, the data is timestamped. Looking through source code, time related streams (e.g.
timed_window
,delay
,rate_limit
, etc.) usetime()
function for event time. Is it possible to fetch existing value (i.e.event['timestamp']
) to these streams so I can do offline processing? If not, what would be the best way to go about this?The text was updated successfully, but these errors were encountered: