Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25547][SQL] Pluggable JDBC connection factory #22560

Closed
wants to merge 9 commits into from
Closed

[SPARK-25547][SQL] Pluggable JDBC connection factory #22560

wants to merge 9 commits into from

Conversation

fsauer65
Copy link

@fsauer65 fsauer65 commented Sep 26, 2018

What changes were proposed in this pull request?

Allow for pluggable connection factories in the spark jdbc package.

  • new option in JDBCOptions called JDBC_CONNECTION_FACTORY_PROVIDER
  • changes to JdbcUtils.createConnectionFactory to use the above
  • when unspecified, the existing DefaultConnectionFactoryProvider is used
  • provided unit tests

Without these changes we had to copy most of the spark jdbc package into our own codebase
to allow us to create our own connection factory in order to load balance queries against
an AWS Aurora postgres cluster.

How was this patch tested?

added unit tests and we use this at Kabbage to load-balance queries against an AWS Aurora postgres cluster with code like the following:

package com.kabbage.rds

class RDSLoadBalancingConnectionFactory extends ConnectionFactoryProvider {
  override def createConnectionFactory(options: JDBCOptions): () => Connection = {
    () => LoadDriver(options).connect(RDS.balancedUrl, options.asConnectionProperties)
  }
}

object RDS  {
  lazy val config: Config = ConfigFactory.load(s"application-${System.getProperty("env")}").getConfig("jobconfig")

  lazy val clusterId = config.getString("rds.cluster")

  lazy val rds = AmazonRDSClientBuilder.defaultClient() // requires AWS_REGION environment variable as well as AWS creds

  private var endpoints: Seq[String] = null

  def balancedUrl: String = this.synchronized {
    // initialize or rotate list of endpoints
    endpoints = if (endpoints == null) {
      rds.describeDBInstances().getDBInstances.asScala
        .filter(i => i.getDBClusterIdentifier == clusterId && i.getDBInstanceStatus == "available")
        .map(instance => s"${instance.getEndpoint.getAddress}:${instance.getEndpoint.getPort}")
    } else endpoints.drop(1) ++ endpoints.take(1)
    endpoints.mkString(s"jdbc:postgresql://",",","/db")
  }

}

and then enable the factory with something like:

    spark.sqlContext.read
      .format("jdbc")
      .option(JDBCOptions.JDBC_CONNECTION_FACTORY_PROVIDER, "com.kabbage.rds.RDSLoadBalancingConnectionFactory")
      .options(connectionProps)
      .option(JDBCOptions.JDBC_TABLE_NAME, table)
  }


val connectionFactoryProvider: ConnectionFactoryProvider =
parameters.get(JDBC_CONNECTION_FACTORY_PROVIDER).map { className =>
Utils.classForName(className).newInstance.asInstanceOf[ConnectionFactoryProvider]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the given name is wrong, this seems to end up with ClassNotFoundException. Could you add a test case for wrong names and add fallback logic showing a proper warn message and load DefaultConnectionFactoryProvider instead?

Copy link
Author

@fsauer65 fsauer65 Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you not want to see this exception and know you've done something wrong instead of silently falling back to the default? I could see throwing a RuntimeException with a customized message though.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now throwing IllegalArgumentException with reasonable message, see unit tests

() => LoadDriver(options).connect(RDS.balancedUrl, options.asConnectionProperties)
}

object PluggableConnectionFactoryExample {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a kind of developer example. Let's have a UT case instead of this example file, PluggableConnectionFactoryExample.scala.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added unit tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, please remove this example file. :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted example

}

object LoadDriver {
def apply(options: JDBCOptions): Driver = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this designed to be shared by 3rd party providers?

Copy link
Author

@fsauer65 fsauer65 Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm also using it in the example for the RDS factory - any custom factory would have to do this or repeat the logic. It's a refactor of what was in the original JdbcUtils.createConnectionFactory

@dongjoon-hyun
Copy link
Member

cc @gatorsmile and @cloud-fan

}

class TestFactory extends ConnectionFactoryProvider {
override def createConnectionFactory(options: JDBCOptions): () => Connection =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without these changes we had to copy most of the spark jdbc package into our own codebase
to allow us to create our own connection factory

Trying to understand. So here we are passing JDBCOptions which is a spark class. I thought one of the goal was to make the custom connection factory code to not depend on spark classes ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent is for the spark jdbc package to allow for custom connection factories by specifying a classname in the options to spark.sql.read. This class has to implement the trait ConnectionFactoryProvider which in this new setup would also be provided by spark jdbc. The only reason we had to copy code into our code base was to make changes to JdbcUtils to allow this and for those classes in spark jdbc that use JdbcUtils.createConnectionFactory to use ours instead. This PR would allow for this behavior out of the box but use the default factory when no className for a ConnectionFactoryProvider is specified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. OK.

@gatorsmile
Copy link
Member

gatorsmile commented Sep 27, 2018

Could you please post the example (you just deleted) in the PR description? If we merge it, it can be part of our commit message. We can easily understand the context of this PR.

Let me think about this and will get back to you later.

@fsauer65
Copy link
Author

@gatorsmile I added the sample code to PR description

@fsauer65
Copy link
Author

fsauer65 commented Oct 8, 2018

@gatorsmile did you get a chance to think about this?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Oct 14, 2018

Hi, @fsauer65 .

  • We are in the middle of Spark 2.4.0 RC voting. So, we are careful and not active for new features. Sorry for the delays.
  • Please fix the build failure.
  • Could you change [Spark Core] to [SQL]?

@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Oct 14, 2018

Test build #97367 has finished for PR 22560 at commit bc4f671.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fsauer65 fsauer65 changed the title [SPARK-25547][Spark Core] Pluggable JDBC connection factory [SPARK-25547][SQL] Pluggable JDBC connection factory Oct 14, 2018
@SparkQA
Copy link

SparkQA commented Oct 15, 2018

Test build #97369 has finished for PR 22560 at commit 526a1d0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@fsauer65
Copy link
Author

tests are failing with

Caused by: sbt.ForkMain$ForkError: java.io.NotSerializableException: org.apache.spark.sql.execution.datasources.jdbc.DefaultConnectionFactoryProvider$
Serialization stack:
	- object not serializable (class: org.apache.spark.sql.execution.datasources.jdbc.DefaultConnectionFactoryProvider$, value: org.apache.spark.sql.execution.datasources.jdbc.DefaultConnectionFactoryProvider$@7d4b4fa8)

Not sure what the right way is to fix this but could it be as simple as extending Serializable? I don't understand why I'm not seeing this in our environment.

@SparkQA
Copy link

SparkQA commented Oct 15, 2018

Test build #97370 has finished for PR 22560 at commit f5671a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait ConnectionFactoryProvider extends Serializable

@maropu
Copy link
Member

maropu commented Oct 15, 2018

Just a question about the design: what's the relationship between JdbcDialect and tihs provider? In the master, we now abstract JDBC connections by JdbcDialect.

BTW, you can't reuse PostgresDialect by connecting an AWS Aurora postgres? We need to add a new interface to do so?

@fsauer65
Copy link
Author

@maropu :

This would certainly be another (perhaps better?) way to accomplish the behavior I'm looking for. However, I can't find where JdbcDialect is used on master to abstract connections, JDBCRDD is still using JdbcUtils to get a connection - does this exist on some other branch? :

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

@gatorsmile
Copy link
Member

Sorry for the late reply. Could we hold this PR since we are actively working on the data source API V2? Will revisit it after that. Also cc @gengliangwang @cloud-fan

FYI, I just marked its target version to 3.0. We will address the issue in the upcoming 3.0 release.

Thanks for your PR!

@gaborgsomogyi
Copy link
Contributor

@fsauer65 there is a discussion in the Spark dev list about JDBC connector for DSv2. Not sure this is the intention there but would be good to find out and create a solution which covers most peoples needs.

@rdblue @gatorsmile is DSv2 stable enough to continue such effort?

@rdblue
Copy link
Contributor

rdblue commented Jul 13, 2019

I think that DSv2 is stable enough to work on implementations.

@fsauer65
Copy link
Author

Is DSv2 on master? In other words, if I resolve the merge conflict we're good?

@rdblue
Copy link
Contributor

rdblue commented Jul 16, 2019

@fsauer65, yes. DSv2 is in master.

merging master into mine
# Conflicts:
#	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107759 has finished for PR 22560 at commit 566c70f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 17, 2019

Test build #107761 has finished for PR 22560 at commit edd3245.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

github-actions bot commented Jan 6, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 6, 2020
@github-actions github-actions bot closed this Jan 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants