Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48243][SQL][HIVE] Support push down char and varchar predicates to Hive Metastore #46539

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val HIVE_METASTORE_PARTITION_PRUNING_VARCHAR =
buildConf("spark.sql.hive.metastorePartitionPruningVarchar")
.doc("When true, predicates of char and varchar type will also be pushed down into " +
"the Hive Metastore.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val HIVE_MANAGE_FILESOURCE_PARTITIONS =
buildConf("spark.sql.hive.manageFilesourcePartitions")
.doc("When true, enable metastore partition management for file source tables as well. " +
Expand Down Expand Up @@ -5275,6 +5283,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def metastorePartitionPruningFastFallback: Boolean =
getConf(HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK)

def metastorePartitionPruningVarchar: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING_VARCHAR)

def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)

def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,16 +723,18 @@ private[client] class Shim_v2_0 extends Shim with Logging {
}
}

object SupportedAttribute {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
private val varcharKeys = table.getPartitionKeys.asScala
// hive varchar is treated as catalyst string, which can be pushed down to
// Hive Metastore since HIVE-26661.
lazy val varcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
.map(col => col.getName).toSet

object SupportedAttribute {
def unapply(attr: Attribute): Option[String] = {
val resolver = SQLConf.get.resolver
if (varcharKeys.exists(c => resolver(c, attr.name))) {
if (!SQLConf.get.metastorePartitionPruningVarchar &&
varcharKeys.exists(c => resolver(c, attr.name))) {
None
} else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType ||
attr.dataType == DateType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ class FiltersSuite extends SparkFunSuite with PlanTest {
InSet(a("strcol", StringType), Set("1", "2").map(s => UTF8String.fromString(s))) :: Nil,
"(strcol = \"1\" or strcol = \"2\")")

filterTest("skip varchar",
(Literal("") === a("varchar", StringType)) :: Nil,
"")

filterTest("SPARK-19912 String literals should be escaped for Hive metastore partition pruning",
(a("stringcol", StringType) === Literal("p1\" and q=\"q1")) ::
(Literal("p2\" and q=\"q2") === a("stringcol", StringType)) :: Nil,
Expand Down Expand Up @@ -250,5 +246,20 @@ class FiltersSuite extends SparkFunSuite with PlanTest {
}
}


test("SPARK-48243: Support push down char and varchar to Hive metastore") {
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_VARCHAR.key -> "false") {
val varcharFilter = Literal("") === a("varchar", StringType)
val varcharConverted = shim.convertFilters(testTable, Seq(varcharFilter))
assert (varcharConverted == "")
}

withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_VARCHAR.key -> "true") {
val varcharFilter = Literal("") === a("varchar", StringType)
val varcharConverted = shim.convertFilters(testTable, Seq(varcharFilter))
assert (varcharConverted == "\"\" = varchar")
}
}

private def a(name: String, dataType: DataType) = AttributeReference(name, dataType)()
}