New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25547][SQL] Pluggable JDBC connection factory #22560
[SPARK-25547][SQL] Pluggable JDBC connection factory #22560
Conversation
|
||
val connectionFactoryProvider: ConnectionFactoryProvider = | ||
parameters.get(JDBC_CONNECTION_FACTORY_PROVIDER).map { className => | ||
Utils.classForName(className).newInstance.asInstanceOf[ConnectionFactoryProvider] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the given name is wrong, this seems to end up with ClassNotFoundException
. Could you add a test case for wrong names and add fallback logic showing a proper warn message and load DefaultConnectionFactoryProvider
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you not want to see this exception and know you've done something wrong instead of silently falling back to the default? I could see throwing a RuntimeException with a customized message though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now throwing IllegalArgumentException with reasonable message, see unit tests
() => LoadDriver(options).connect(RDS.balancedUrl, options.asConnectionProperties) | ||
} | ||
|
||
object PluggableConnectionFactoryExample { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a kind of developer example. Let's have a UT case instead of this example file, PluggableConnectionFactoryExample.scala
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added unit tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, please remove this example file. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted example
} | ||
|
||
object LoadDriver { | ||
def apply(options: JDBCOptions): Driver = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this designed to be shared by 3rd party providers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm also using it in the example for the RDS factory - any custom factory would have to do this or repeat the logic. It's a refactor of what was in the original JdbcUtils.createConnectionFactory
cc @gatorsmile and @cloud-fan |
} | ||
|
||
class TestFactory extends ConnectionFactoryProvider { | ||
override def createConnectionFactory(options: JDBCOptions): () => Connection = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without these changes we had to copy most of the spark jdbc package into our own codebase
to allow us to create our own connection factory
Trying to understand. So here we are passing JDBCOptions which is a spark class. I thought one of the goal was to make the custom connection factory code to not depend on spark classes ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent is for the spark jdbc package to allow for custom connection factories by specifying a classname in the options to spark.sql.read. This class has to implement the trait ConnectionFactoryProvider which in this new setup would also be provided by spark jdbc. The only reason we had to copy code into our code base was to make changes to JdbcUtils to allow this and for those classes in spark jdbc that use JdbcUtils.createConnectionFactory to use ours instead. This PR would allow for this behavior out of the box but use the default factory when no className for a ConnectionFactoryProvider is specified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. OK.
Could you please post the example (you just deleted) in the PR description? If we merge it, it can be part of our commit message. We can easily understand the context of this PR. Let me think about this and will get back to you later. |
@gatorsmile I added the sample code to PR description |
@gatorsmile did you get a chance to think about this? |
Hi, @fsauer65 .
|
ok to test |
Test build #97367 has finished for PR 22560 at commit
|
Test build #97369 has finished for PR 22560 at commit
|
tests are failing with
Not sure what the right way is to fix this but could it be as simple as extending Serializable? I don't understand why I'm not seeing this in our environment. |
Test build #97370 has finished for PR 22560 at commit
|
Just a question about the design: what's the relationship between BTW, you can't reuse |
@maropu : This would certainly be another (perhaps better?) way to accomplish the behavior I'm looking for. However, I can't find where JdbcDialect is used on master to abstract connections, JDBCRDD is still using JdbcUtils to get a connection - does this exist on some other branch? : |
Sorry for the late reply. Could we hold this PR since we are actively working on the data source API V2? Will revisit it after that. Also cc @gengliangwang @cloud-fan FYI, I just marked its target version to 3.0. We will address the issue in the upcoming 3.0 release. Thanks for your PR! |
@fsauer65 there is a discussion in the Spark dev list about JDBC connector for DSv2. Not sure this is the intention there but would be good to find out and create a solution which covers most peoples needs. @rdblue @gatorsmile is DSv2 stable enough to continue such effort? |
I think that DSv2 is stable enough to work on implementations. |
Is DSv2 on master? In other words, if I resolve the merge conflict we're good? |
@fsauer65, yes. DSv2 is in master. |
merging master into mine
# Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
Test build #107759 has finished for PR 22560 at commit
|
Test build #107761 has finished for PR 22560 at commit
|
Can one of the admins verify this patch? |
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
What changes were proposed in this pull request?
Allow for pluggable connection factories in the spark jdbc package.
Without these changes we had to copy most of the spark jdbc package into our own codebase
to allow us to create our own connection factory in order to load balance queries against
an AWS Aurora postgres cluster.
How was this patch tested?
added unit tests and we use this at Kabbage to load-balance queries against an AWS Aurora postgres cluster with code like the following:
and then enable the factory with something like: