您现在的位置是:首页 > 博文答疑 > SPARK通过JDBC对MYSQL读操作的简单例子博文答疑

SPARK通过JDBC对MYSQL读操作的简单例子

Leo2017-05-21【8】

简介SPARK通过JDBC对MYSQL读操作的简单例子。其中使用了JDBC, SCALA, SPARK. 本文意为体验JDBC在spark中的使用。

在之前,因为在配置种加了springbook后发现有版本冲突的问题,经过一天的网上的参考和自己动手调试,修改配置文件如下。貌似maven已经有比较好的版本,大家可以网上自行搜索,在这里提供SBT的配置为文件如下:

name := "SparkLearning"

version := "1.0"

scalaVersion := "2.12.2"

val overrideScalaVersion = "2.10.4"
val sparkVersion = "2.1.1"
val sparkXMLVersion = "0.3.3"
val sparkCsvVersion = "1.4.0"
val sparkElasticVersion = "2.3.4"
val sscKafkaVersion = "2.0.1"
val sparkMongoVersion = "1.0.0"
val sparkCassandraVersion = "1.6.0"

//Override Scala Version to the above 2.11.8 version
ivyScala := ivyScala.value map {
  _.copy(overrideScalaVersion = true)
}

libraryDependencies ++= Seq(
  //"org.springframework.boot" % "spring-boot-starter-parent" % "1.5.3.RELEASE",
  //"org.springframework.boot" % "spring-boot-starter-web" % "1.5.3.RELEASE",//和scala spark版本冲突,降低版本
  //"org.springframework" % "spring-core" % "4.3.7.RELEASE",
  "org.springframework.boot" % "spring-boot-starter-parent" % "1.3.2.RELEASE",
  "org.springframework.boot" % "spring-boot-starter-web" % "1.3.2.RELEASE",
  "org.apache.spark" % "spark-streaming_2.11" % "2.1.1",
  "org.apache.spark" %% "spark-core" % sparkVersion exclude("jline", "2.12"),
  "org.apache.spark" %% "spark-sql" % sparkVersion excludeAll(ExclusionRule(organization = "jline"), ExclusionRule("name", "2.12")),
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-yarn" % sparkVersion,
  "com.databricks" %% "spark-xml" % sparkXMLVersion,
  "com.databricks" %% "spark-csv" % sparkCsvVersion,
  "org.apache.spark" %% "spark-graphx" % sparkVersion,
  "org.apache.spark" %% "spark-catalyst" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  //  "com.101tec"           % "zkclient"         % "0.9",
  "org.elasticsearch" %% "elasticsearch-spark" % sparkElasticVersion,
  //  "org.apache.spark" %% "spark-streaming-kafka-0-10_2.11" % sscKafkaVersion,
  "org.mongodb.spark" % "mongo-spark-connector_2.11" % sparkMongoVersion,
  "com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",
  //  "dibbhatt" % "kafka-spark-consumer" % "1.0.8",
  //  "net.liftweb" %% "lift-webkit" % "2.6.2",
  "mysql" % "mysql-connector-java" % "5.1.38"
)

接下来可以看一个简单的MYSQL读操作了。记得在跑程前,记得搭建并且开启MYSQL。

package sql.sample
import java.util.Properties
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object MySQLRead {
  def main(args: Array[String]) = {
    val sparkConf = new SparkConf().setAppName("MySQLRead").setMaster("local[2]")
    val sparkContext = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sparkContext)
    //val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)//不引用包时可以这么new该对象
    val properties = new Properties()
    val url = "jdbc:mysql://localhost:3306/test" //mysql的地址和database name
    val jdbcDF = sqlContext.read.format("jdbc").options(
      Map("url" -> url,
        "user" -> "root",
        "password" -> "leo",
        "dbtable" -> "test")).load()// This is the table name which I will read
    //jdbcDF.collect().take(20).foreach(println) //we can collect directly
    jdbcDF.registerTempTable("test")
    sqlContext.sql("select * from test").collect().take(10).foreach(println)// or use SQL to collect and print
  }
}

然后右键运行,查看结果如下:

blob.png

全文完 。