create table student (id int(4), name char(20), gender char(4), age int(4)); insert into student values(1,'Xueqian','F',23); insert into student values(2,'Weiliang','M',24); cd ~ wget http://10.90.3.2/LMS/hadoop/hive/mysql-connector-java-5.1.46.jar sudo cp -R mysql-connector-java-5.1.46.jar /opt/spark-2.1.1-bin-hadoop2.7/jars jdbcDF = spark.read \ .format("jdbc") \ .option("driver","com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://localhost:3306/spark") \ .option("dbtable", "student") \ .option("user", "root") \ .option("password", "123456") \ .load() jdbcDF.show() from pyspark.sql import Row from pyspark.sql.types import * #设置模式信息,分别对应表格student的四个属性值 schema = StructType([StructField("id", IntegerType(), True), \ StructField("name", StringType(), True), \ StructField("gender", StringType(), True), \ StructField("age", IntegerType(), True)]) #设置两条数据,表示两条学生信息 studentRDD = spark \ .sparkContext \ .parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]) \ .map(lambda x:x.split(" ")) #创建Row对象,每个Row对象都是rowRDD中的一行 rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip()))) #建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来。此处得到DataFrame对象studentDF studentDF = spark.createDataFrame(rowRDD, schema) #----------写入数据库--------- prop = {} #设置要连接的mysql数据库名 prop['user'] = 'root' #设置要连接的mysql数据库登录密码 prop['password'] = '123456' #设置要连接的mysql数据库驱动 prop['driver'] = "com.mysql.jdbc.Driver" #设置要连接的mysql数据库表格名称,和插入数据的方式 studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop) select * from student; 1::661::3::978302109 1::914::3::978301968 1::3408::4::978300275 wget http://10.90.3.2/czxy/ratings.dat pip install pandas # 1.读取数据集 import pandas as pd from pyspark.sql.types import * pandas_df = pd.read_csv("/home/ubuntu/ratings.dat", sep="::") schema = StructType([StructField("user_id", StringType(), True),\ StructField("movie_id", StringType(), True), \ StructField("rating", IntegerType(), True), \ StructField("ts",StringType(),True)]) ratingsDF = spark.createDataFrame(pandas_df,schema) print("1.数据集如下:") ratingsDF.show() 2. 求每个用户的平均打分 import pyspark.sql.functions as F print("2.求每个用户的平均打分") ratingsDF.groupby("user_id").\ avg("rating").\ withColumnRenamed("avg(rating)","avg_rating").\ withColumn("avg_rating",F.round("avg_rating",3)).\ orderBy("avg_rating",ascending=False).\ show() # 3. 求每部电影的平均打分 print("3.求每部电影的平均打分") ratingsDF.createTempView("movie_ratings") spark.sql("SELECT movie_id, ROUND(AVG(rating),3) AS avg_rating FROM movie_ratings GROUP BY movie_id ORDER BY avg_rating DESC").show() # 4.查询大于平均打分的电影的数量 print("4.查询大于平均打分的电影的数量") movieCount = ratingsDF.where(ratingsDF["rating"]>ratingsDF.select(F.avg(ratingsDF["rating"])).first()["avg(rating)"]).count() print("大于平均打分的电影的数量是:", movieCount) # 5. 查询高分(大于3分)电影中打分次数最多的用户,给出此人打分的平均值 print(" 5.查询高分(大于3分)电影中打分次数最多的用户,给出此人打分的平均值") user_id = ratingsDF.where(ratingsDF["rating"]>3).\ groupBy("user_id").\ count().\ withColumnRenamed("count","high_rating_count"). \ orderBy("high_rating_count", ascending=False).\ limit(1).\ first()["user_id"] ratingsDF.filter(ratingsDF["user_id"]==user_id).select(F.round(F.avg("rating"),3)).show() # 6.查询每个用户的平均打分、最低打分和最高打分 print("6.查询每个用户的平均打分、最低打分和最高打分") ratingsDF.groupBy("user_id").\ agg( F.round(F.avg("rating"),3).alias("avg_rating"), F.round(F.min("rating"), 3).alias("min_rating"), F.round(F.max("rating"), 3).alias("max_rating") ).show() # 7. 查询打分次数超过100次的电影的平均分排行榜的TOP10 print("7.查询打分次数超过100次的电影的平均分排行榜的TOP10") ratingsDF.groupBy("movie_id").\ agg( F.count("movie_id").alias("cnt"), F.round(F.avg("rating"),3).alias("avg_rating") ).where("cnt>100").\ orderBy("avg_rating",ascending=False).\ limit(10).\ show() 1,Ella,36 2,Bob,29 3,Jack,29 { "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name":"Jack","age":29 } { "id":4 , "name":"Jim","age":28 } { "id":4 , "name":"Jim","age":28 } { "id":5 , "name":"Damon" } { "id":5 , "name":"Damon" }
为employee.json创建DataFrame,并写出Python语句完成下列操作,截图程序代码和运行结果(标明题号):
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除id字段;
(4)筛选出age>30的记录;
(5)将数据按age分组;
(6)将数据按name升序排列;
(7)取出前3行数据;
(8)查询所有记录的name列,并为其取别名为username;
(9)查询年龄age的平均值;
(10)查询年龄age的最小值。