-
Notifications
You must be signed in to change notification settings - Fork 540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test(pyspark): validate pyspark-specific tests for streaming #8945
base: main
Are you sure you want to change the base?
test(pyspark): validate pyspark-specific tests for streaming #8945
Conversation
aa81cd4
to
1c5680b
Compare
1c5680b
to
b8894d0
Compare
# E.g., | ||
# cursor.query.writeStream.format("memory").queryName("table_name").start() | ||
# | ||
# This in-memory table might conflict with those defined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use arbitrary names from the name generator? The chance of clashing is never 0, but it's pretty low.
import glob | ||
|
||
files = glob.glob(f"{dir_}/*.parquet") | ||
df_list = [pd.read_parquet(f) for f in files] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can past the files
list directly to pd.read_parquet
, or just the directory itself
.option("path", dir_) | ||
.trigger(availableNow=True) | ||
.start() | ||
.awaitTermination() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you assuming here that the job will always finish? What if it's a continuous query?
def _fetch_from_cursor(self, cursor, schema): | ||
df = cursor.query.toPandas() # blocks until finished | ||
if cursor.query.isStreaming: | ||
df = self._execute_stream(cursor.query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we actually support converting the results back into pandas df in the Flink backend right now... Also these outputs can get arbitrarily large because it runs on continuous data. I get that the intention is the maintain the same interface/UX across streaming and batch jobs, but I wonder if this makes sense
@@ -22,22 +25,41 @@ def set_pyspark_database(con, database): | |||
class TestConf(BackendTest): | |||
deps = ("pyspark",) | |||
|
|||
def _load_data(self, **_: Any) -> None: | |||
def _load_data_helper(self, for_streaming: bool = False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just streaming
?
# cases, you can re-enable schema inference by setting | ||
# spark.sql.streaming.schemaInference to true." | ||
# Ref: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#schema-inference-and-partition-of-streaming-dataframesdatasets | ||
s.sql("set spark.sql.streaming.schemaInference=true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can pass this as a config to the session builder
ibis/ibis/backends/pyspark/tests/conftest.py
Lines 169 to 191 in b8894d0
config = ( | |
SparkSession.builder.appName("ibis_testing") | |
.master("local[1]") | |
.config("spark.cores.max", 1) | |
.config("spark.default.parallelism", 1) | |
.config("spark.driver.extraJavaOptions", "-Duser.timezone=GMT") | |
.config("spark.dynamicAllocation.enabled", False) | |
.config("spark.executor.extraJavaOptions", "-Duser.timezone=GMT") | |
.config("spark.executor.heartbeatInterval", "3600s") | |
.config("spark.executor.instances", 1) | |
.config("spark.network.timeout", "4200s") | |
.config("spark.rdd.compress", False) | |
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
.config("spark.shuffle.compress", False) | |
.config("spark.shuffle.spill.compress", False) | |
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") | |
.config("spark.sql.session.timeZone", "UTC") | |
.config("spark.sql.shuffle.partitions", 1) | |
.config("spark.storage.blockManagerSlaveTimeoutMs", "4200s") | |
.config("spark.ui.enabled", False) | |
.config("spark.ui.showConsoleProgress", False) | |
.config("spark.sql.execution.arrow.pyspark.enabled", False) | |
) |
# Note: The same session can be used for both batch and streaming | ||
# jobs in Spark. Streaming is made explicit on the source | ||
# dataframes. This is why, we do not really need a separate | ||
# `TestConf` class for streaming, but only need to create | ||
# streaming counterparts for the test tables. Still added this | ||
# class to keep the testing uniform with Flink. This class is used | ||
# in `con_streaming()` fixture (streaming counterpart of `con()`) | ||
# to create a new spark session and load the `***_streaming` | ||
# tables for testing. However, either `con()` or | ||
# `con_streaming()` can be used to execute any batch/streaming | ||
# job. This is why, we set `autouse=True` for `con_streaming()` | ||
# to create the streaming tables, and then rely solely on `con()` | ||
# to operate on those tables in the tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense to try to keep the testing uniform, but I also wonder if it's cleaner/easier to just reuse the same test con, given this difference in behavior... It feels a little unnecessary. Maybe others have some thoughts.
Description of changes
Aims to address #8888.