Pyspark连接读取Hive和Mysql数据库

一、前期准备

这里默认spark版本支持hive且与hadoop本本相互兼容。
这里使用的版本是:

  • Hadoop 2.7.5
  • Spark 2.3
  • Anaconda 3.5
  • Hive 1.2

1)安装mysql

2)创建hive用户并赋予权限

1
2
3
4
5
6
7
8
create user 'hive'@'*' identified by 'hive';

# 这里允许所有用户连接可以以hive身份登陆mysql,并赋予hive用户操作所有数据的权限。下面读写mysql时候需要这个权限
grant all privileges on *.* to 'hive'@'%' identified by 'hive';

grant all privileges on *.* to 'root'@'%' identified by '1';

flush privileges;

3)修改mysql相关配置

1
2
3
4
5
6
7
8
9
10
11
sudo vi /etc/mysql/mysql.conf.d/mysqld.cnf

# 1. 添加以下内容,目的是增加等待时常,避免因默认连接时间较短造成的连接失败
[mysqld]
#
wait_timeout=86400
interactive_timeout=7200

# 2. 修改一下配置,目的是允许所有IP访问本地mysql,这个很重要否则只能通过locaohost访问

bind-address = 0.0.0.0

4)配置hive-site.xml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>master,slave1,slave2</value>
</property>
</configuration>

5)初始化hive元数据库

1
schematool -initSchema -dbType mysql

6)将mysql驱动器分别拷贝到hive/libspark/jars/

7)将hive/conf/hive-site.xml拷贝到spark/conf

二、Pyspark读写Hive数据库

初始化实例

1
2
3
from pyspark.sql import HiveContext

hive = HiveContext(sc)

读取Hive数据库

1
2
st_ds = hive.sql("select * from sparktest.student limit 5")
st_ds.show()
+---+-------+------+---+
| id|   name|gender|age|
+---+-------+------+---+
|  1|Xueqian|     F| 23|
|  1|Xueqian|     F| 23|
|  1|Xueqian|     F| 23|
|  1|Xueqian|     F| 23|
|  1|Xueqian|     F| 23|
+---+-------+------+---+

写入Hive数据库

1
2
st_ds.createTempView("test")
hive.sql("insert into sparktest.student select * from test")
DataFrame[]

三、Pyspark 读写Mysql数据库

疑问:当pyspark 以本地模式启动时可以正常读取mysql,以yarn-client模式启动时无法读取mysql数据库

解决:初始化连接驱动时使用jdbc:mysql://master:3306即可,前提是修改mysql配置bind-address = 0.0.0.0

读取Mysql数据库

1
2
3
mysql_df = spark.read.format('jdbc').options(url='jdbc:mysql://master:3306/hive',
dbtable='VERSION', user='hive',
password='hive').load()
1
mysql_df.show(truncate=False)
+------+--------------+---------------------------------------+
|VER_ID|SCHEMA_VERSION|VERSION_COMMENT                        |
+------+--------------+---------------------------------------+
|1     |1.2.0         |Set by MetaStore hadoop@192.168.221.138|
+------+--------------+---------------------------------------+

写入Mysql数据库

1
2
# 这里使用上面hive读取的数据
st_ds.write.jdbc("jdbc:mysql://master:3306/test?user=hive&password=hive","t1", "overwrite")