Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partially specified writes from case classes #1139

Open
wants to merge 50 commits into
base: b2.0
Choose a base branch
from
Open

Support partially specified writes from case classes #1139

wants to merge 50 commits into from

Conversation

aashishs101
Copy link

Currently, we cannot use rdd.saveToCassandra on an RDD[CaseClass] if CaseClass does not contain all of the fields from the target table. This will currently result in the Error:

java.lang.IllegalArgumentException: requirement failed: Columns not found in CaseClass: [missing_field1, missing_field2]

This can be limiting if the table has had columns added to it from an external process. If this is the case, a job that writes to the table will automatically fail. This PR proposes a resolution for this issue by making the failure case more permissive (if all of the first n fields of the target table are matched by the case class, then we will attempt the write).

RussellSpitzer and others added 30 commits February 28, 2017 14:12
SPARKC-476: SessionProxy Should proxy all Runtime Interfaces
Previously when a DataFrame was turned into an RDD by it's `rdd` method
saveToCassandra and joinWithCassandraTable would fail because of the
lack of an implicit RowWriterFactory. To fix this we add an implicit
for a RowWriterFactory for RDD[T <: Row] which ends up mapping to the
SqlRowWriterFactory which we already have written.
The CassandraRDDMock just passes through another RDD and pretends it is
a CassandraRDD.
SPARKC-466: Add a CassandraRDDMock for end users to use in Unit Testing
SPARKC-475: Add implicit RowWriterFactory for RDD[Row]
Fix doc links
Adds Python Dictionary as Kwargs Example
More examples, Common issues
Minor edits for consistency
Text and link edits
Refresh Documentation to Use Spark 2.X concepts
* SPARKC 492: Protect against Size Estimate Overflows

* SPARKC-491: add java.time classes support to converters and sparkSQL

* SPARKC-470: Allow Writes to Static Columnns and Partition Keys
Link to spark-connector Slack channel at DataStax Academy Slack
For some reason the logic for finding the module names was not working
(at least on my Mac). Here I simplify it by just iterating over the two
folders we know about, therefore making sure the output files exist in
their own separate folders.
@datastax-bot
Copy link

Hi @aashishs101, thanks for your contribution!

In order for us to evaluate and accept your PR, we ask that you sign Spark Cassandra Connector CLA. It's all electronic and will take just minutes.

@datastax-bot
Copy link

Thank you @aashishs101 for signing the Spark Cassandra Connector CLA.


it should "throw a meaningful exception when a column has an incorrect type" in {
val userTable = newTable(loginColumn, addressColumn)
val user = UserWithUnknownType("foo", new UnknownType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this test changed?

val mappedColumns = getterMap.values.toSet
val unmappedColumns = selectedColumns.filterNot(mappedColumns)
require(unmappedColumns.isEmpty, s"Columns not found in $tpe: [${unmappedColumns.mkString(", ")}]")
require(selectedColumns.endsWith(unmappedColumns), s"Unmapped columns nust be at end of table definition: [${unmappedColumns.mkString(", ")}]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this means, seems like it would only match if unmappedColumns was in the same order as selectedColumns

@@ -19,13 +19,12 @@ class DefaultRowWriter[T : TypeTag : ColumnMapper](

override def readColumnValues(data: T, buffer: Array[Any]) = {
val row = converter.convert(data)
for (i <- columnNames.indices)
for (i <- row.columnValues.indices)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns values in the order of the data and not in the order of the selected columns?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be changing the order, since these are the indices of the Seq and all of the actual data access is being done on the next line where members of the row and buffer are being accessed/modified. In most cases, the number of table columns will be equal to the number of values in the row. However, in the case that we're interested in (the underspecified case class case), the number of columns in the table will be greater than the number of values in the row, and so we want to use the smaller number.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

if (!isTopLevel)
require(unmappedColumns.isEmpty, s"Columns not found in nested $tpe: [${unmappedColumns.mkString(", ")}]")
else
require( selectedColumns.endsWith(unmappedColumns), s"Unmapped columns nust be at end of table definition: [${unmappedColumns.mkString(", ")}]")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer the comment you had here was removed after my most recent push, but I still wanted to address it. Based on the way that unmappedColumns is created (as a filter on selectedColumnson line 109), the two sequences should be in the same order. So, this requirement is essentially asserting that we should map the first n columns from the table with the case class, but subsequent columns can go unmapped.

@aashishs101
Copy link
Author

@RussellSpitzer, any word on this?

@RussellSpitzer
Copy link
Contributor

Do we have a SPARKC ticket for this yet? I couldn't find one in jira?
https://datastax-oss.atlassian.net/issues/

@aashishs101
Copy link
Author

Sorry, I hadn't created one until now. Here it is: https://datastax-oss.atlassian.net/browse/JAVA-1676

@RussellSpitzer
Copy link
Contributor

Going to try to trigger jenkins from here

@RussellSpitzer
Copy link
Contributor

test this please

@RussellSpitzer
Copy link
Contributor

Tests running!

@ds-jenkins-builds
Copy link

Build against Scala 2.10 finished with success

@ds-jenkins-builds
Copy link

Build against Scala 2.11 finished with success

@RussellSpitzer
Copy link
Contributor

So are we sure we want to aim this patch at Master? I don't think we are going to do another big public release. So if we want to have this capability it would be nice if we could find a way of back-porting it to b1.6 or b2.0 I know we slightly change one method signature but since it's an internal api it should be ok even on the earlier branches... thoughts?

@aashishs101
Copy link
Author

@RussellSpitzer, since it is an internal method with a default parameter, I'm guessing it shouldn't change much? I'm also not sure about all of the ramifications of doing a backport vs. a normal release. Would I need to make a new PR that reapplies these changes against b2.0?

@aashishs101
Copy link
Author

Hey @RussellSpitzer, any update on this?

@RussellSpitzer
Copy link
Contributor

Sorry yeah, you need to change the PR target to b2.0

@aashishs101 aashishs101 changed the base branch from master to b2.0 December 15, 2017 18:55
@aashishs101
Copy link
Author

test this please

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants