本文共 2817 字,大约阅读时间需要 9 分钟。
1在IDEA中使用scala语言在spark环境中访问postgres数据库,执行相应的sql语句,返回相应的结果.
自身在访问数据库中遇到了很多的坑,就尝试多种方式,如下:使用DataFrameReader 类提供的jdbc(url,tbname,conn)方法从指定数据库读取数据
//获取spark的连接val session = SparkSession.builder() .master("local") .appName(JDBCDemo.getClass.getSimpleName) .getOrCreate()val driver:String = "org.postgresql.Driver"val url = "jdbc:postgresql://localhost:5432/postgres"val tbname = "products"Properties conn = new Properties();//增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)conn .put("user","postgres");conn .put("password","123456");conn .put("driver",driver);//SparkJdbc读取Postgresql的products表内容val jdbcDF = session.read().format("jdbc").jdbc(url ,tbname ,conn );//因为这种方式中这是方式format("jdbc")可以省略jdbcDF.show();//显示jdbcDF数据内容
说明:其中spark是指SparkSession对象,如不明白请上网查询
使用DataFrameReader 类提供的【options()方法+ load()方法】方法从指定数据库读取数据,
#format("jdbc")代表使用jdbc方式访问数据库val load:Dataset= session.read().format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") #要访问的具体的表.option("user", "username") #操作该数据库的用户名.option("password", "password") #操作该数据库的用户密码.load();//或者等价于==/* val load:Dataset[Row] = session.read.format("jdbc").options( Map("url" -> url, "dbtable" -> tname, "user" -> "postgres", "password" -> "postgres", "driver" -> driver ) ).load()*/session.close()
其中://url的格式是jdbc:postgresql://数据库IP:端口号/数据库名称;其中session是指SparkSession对象
1.3 写入数据到数据库中
提示:下面代码中的read表示DataFrame[Row]类型的对象;//写入数据库的(第一种)方法(此方法是默认模式(存在该表就直接报错)) //调用jdbc方法,方法里面的参数第一个是定义的url数据库连接,第二个是表名,第三个是Properties类的实例化对象(我们命名为conn)load.write.jdbc(url, "tableX", conn) //写入数据库的(第二种)方法:调用mode方法并传入 SaveMode.Append 参数 (就是存在该表的情况下就直接在表后面追加) load.write.mode(SaveMode.Append).jdbc(url, "tableX", conn) //写入数据库(第三种)方式,调用mode方法并传入 SaveMode.Overwrite 参数 (吐过存在该表的情况下 覆盖里面的数据) load.write.mode(SaveMode.Overwrite).jdbc(url, "tableX", conn)
val conn_str = "jdbc:postgresql://IP地址:端口号/数据库名称"classOf[com.mysql.jdbc.Driver] // 使用上一句可能会有warning,因为这是一个表达式,可以换成下面的// Class.forName("org.postgresql.Driver").newInstanceval conn = DriverManager.getConnection(conn_str, "用户名称", "密码")val conn = DriverManager.getConnection(conn_str)try { // Configure to be Read Only val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) // Execute Query val rs = statement.executeQuery("SELECT quote FROM quotes LIMIT 5") // Iterate Over ResultSet while (rs.next) { println(rs.getString("quote")) } } finally { conn.close }
提示:应为java在scala开发环境中式完全兼容的(运行在java的JVM中),因此使用掺杂使用java方式访问数据库式是完全合法的。但是建议使用scala与spark原生的方式访问数据库。
(SPark SQL 从 DB 读取数据方法和方式 scala)
(未完待续……..)
转载地址:http://srvlf.baihongyu.com/