博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
sparkR 跑通的函数
阅读量:7066 次
发布时间:2019-06-28

本文共 5463 字,大约阅读时间需要 18 分钟。

spark1.4.0的sparkR的思路:用spark从大数据集中抽取小数据(sparkR的DataFrame),然后到R里分析(DataFrame)。

这两个DataFrame是不同的,前者是分布式的,集群上的DF,R里的那些包都不能用;后者是单机版的DF,包里的函数都能用。
sparkR的开发计划,个人觉得是将目前包里的函数,迁移到sparkR的DataFrame里,这样就打开一片天地。

> a<- sql(hiveContext, "SELECT count(*) FROM anjuke_scores where restaurant>=10");

> a<- sql(hiveContext, "SELECT * FROM anjuke_scores limit 5")> aDataFrame[city:string, housingname:string, ori_traffic_score:int, ori_traffic_score_normal:double, metro_station:double, metro_station_normal:double,...
> first(a)  #显示Formal Data Frame的第一行

> head(a) ; #列出a的前6行
> columns(a) # 列出全部的列
[1] "city" "housingname" "ori_traffic_score" "ori_traffic_score_normal"
[5] "metro_station" "metro_station_normal" "bus_station" "bus_station_normal" ...

> showDF(a)

 

> b<-filter(a, a$ori_comfort>8); # 行筛选, ori_comfort_normal:double

> print(a);    #打印列名及类型  DataFrame[city:string, housingname:string, ori_traffic_score:int, ......

> printSchema(a); # 打印列名的树形框架概要 root |-- city: string (nullable = true) |-- housingname: string (nullable = true) |-- ori_traffic_score: integer (nullable = true) |-- ori_traffic_score_normal: double (nullable = true) |-- metro_station: double (nullable = true)
> take(a,10) ; # 提取Formal class DataFrame的前面num行,成为R中普通的 data frame , take(x, num)
city housingname ori_traffic_score ori_traffic_score_normal metro_station metro_station_normal
1 \t\x9a \xddrw\xb8 NA 0 NA 0
2 \t\x9a \xe4\xf04\u03a2\021~ NA 0 NA 0
3 \t\x9a \xf6\xe3w\xb8 NA 0 NA 0
4 \t\x9a \x8e=\xb0w\xb8 NA 0 NA 0
5 \t\x9a \t\x9a\xe4\xf04\xce\xe4\xf0~ NA 0 NA 0
6 \t\x9a q4\xfdE NA 0 NA 0
7 \t\x9a \xe4\xf04\xce NA 0 NA 0
8 \t\x9a )\xfdVT NA 0 NA 0
9 \t\x9a q\177V NA 0 NA 0
10 \t\x9a \xe4\xf04\xceW\xb8 NA 0 NA 0

> b<-take(a,10) > dim(b)[1] 10 41

> aa <- withColumn(a, "ori_comfort_aa", a$ori_comfort * 5)   #用现有的列生成新的列, 新增一列,ori_comfort_aa,结果还是Formal data frame结构> printSchema(aa)root |-- city: string (nullable = true)......... |-- comfort_normal: double (nullable = true) |-- ori_comfort_aa: double (nullable = true)

> aa <- mutate(a, newCol1 = a$commerce_normal * 5, newCol2 = a$bank_normal * 2) ; #与withColumn类似
> printSchema(aa)
root
|-- city: string (nullable = true)
。。。。。。。。。。。。。。。。。。
|-- comfort_normal: double (nullable = true)
|-- newCol1: double (nullable = true)
|-- newCol2: double (nullable = true)
a1<-arrange(a,asc(a$level_tow)); # 按列排序, asc升序,desc降序
a1<-orderBy(a,asc(a$level_tow)); # 按列排序
count(a) ; # 统计 Formal Data Frame有多少行数据

> dtypes(a);  #以list的形式列出Formal Data Frame的全部列名及类型[[1]][1] "city"   "string"[[2]][1] "housingname" "string"

 

 

 

> a<-withColumnRenamed(a,"comfort_normal","AA");  # 更改列名  > printSchema(a)root |-- city: string (nullable = true) |-- housingname: string (nullable = true).......... |-- AA: double (nullable = true)

创建sparkR的数据框的函数

createDataFrame

> df<-createDataFrame(sqlContext,a.df);  # a.df是R中的数据框, df是sparkR的数据框,注意:使用sparkR的数据库,需要sqlContext

> str(a.df)
'data.frame': 5 obs. of 41 variables:
> str(df)
Formal class 'DataFrame' [package "SparkR"] with 2 slots
..@ env:<environment: 0x4fce350> 
..@ sdf:Class 'jobj' <environment: 0x4fc70b0> 
> destDF <- select(SFO_DF, "dest", "cancelled"); #选择列
> showDF(destDF); #显示sparkR的DF
+----+---------+
|dest|cancelled|
+----+---------+
| SFO| 0|
................
> registerTempTable(SFO_DF, "flightsTable"); #要对sparkDF使用SQL语句,首先需要将DF注册成一个table
> wa <- sql(sqlContext, "SELECT dest, cancelled FROM flightsTable"); #在sqlContext下使用SQL语句
> showDF(wa); #查询的结果还是sparkDF
+----+---------+
|dest|cancelled|
+----+---------+
| SFO| 0|
................
> local_df <- collect(wa); #将sparkDF转换成R中的DF
> str(local_df)
'data.frame': 2818 obs. of 2 variables:
$ dest : chr "SFO" "SFO" "SFO" "SFO" ...
$ cancelled: int 0 0 0 0 0 0 0 0 0 0 ...
> wa<-flights_df[1:1000,]; #wa是R中的DF
> flightsDF<-createDataFrame(sqlContext,wa) ; #flightsDF是sparkR的DF
> library(magrittr); #管道函数的包对sparkRDF适用
> groupBy(flightsDF, flightsDF$date) %>%
+ summarize(avg(flightsDF$dep_delay), avg(flightsDF$arr_delay)) -> dailyDelayDF; #注意,语法和dplyr中的有所不同,结果还是sparkRDF
> str(dailyDelayDF)
Formal class 'DataFrame' [package "SparkR"] with 2 slots
..@ env:<environment: 0x4cd3118> 
..@ sdf:Class 'jobj' <environment: 0x4cd6968> 
> showDF(dailyDelayDF)
+----------+--------------------+--------------------+
| date| AVG(dep_delay)| AVG(arr_delay)|
+----------+--------------------+--------------------+
|2011-01-01| 5.2| 5.8|
|2011-01-02| 1.8333333333333333| -2.0|
................
在39机器上跑的
collect将sparkDF转化成DF
Collects all the elements of a Spark DataFrame and coerces them into an R data.frame.
collect(x, stringsAsFactors = FALSE),x:A SparkSQL DataFrame
> dist_df<- sql(hiveContext, "SELECT * FROM anjuke_scores where restaurant<=1");
> local_df <- dist_df %>% 
groupBy(dist_df$city) %>% 
summarize(count = n(dist_df$housingname)) %>% 
collect
> local_df
city count
1 \t\x9a 5
2 8\xde 7
3 \xf0\xde 2
..........
..........
take也可将sparkDF转化成DF
Take the first NUM rows of a DataFrame and return a the results as a data.frame
take(x, num)
> local_df <- dist_df %>% 
groupBy(dist_df$city) %>% 
summarize(count = n(dist_df$housingname))
> a<-take(local_df,100)
[Stage 16:=========================================> (154 + 1) / 199] > View(a)
> a
city count
1 \t\x9a 5
2 8\xde 7
3 \xf0\xde 2
..........
..........
不通的函数:

> describe(a)Error in x[present, drop = FALSE] :   object of type 'S4' is not subsettable

 

> jfkDF <- filter(flightsDF, flightsDF$dest == "DFW")Error in filter(flightsDF, flightsDF$dest == "DFW") :   no method for coercing this S4 class to a vector

转载地址:http://bpxll.baihongyu.com/

你可能感兴趣的文章
微服务架构 SpringCloud(二)Eureka(服务注册和服务发现基础篇)
查看>>
oracle RAC的客户端HA配置
查看>>
VsCode编辑器
查看>>
spring cloud开发、部署注意事项
查看>>
又一款基于BCH开发出来的社交软件BlockPress
查看>>
ttlsa教程系列之mongodb——(五)mongodb架构-复制原理&复制集
查看>>
虚拟主机通过修改.htaccess将入口重定向到public文件夹
查看>>
nginx快速安装
查看>>
Kinect for windows的脸部识别
查看>>
MySQL 运维笔记(一)—— 终止高负载SQL
查看>>
Carrie Higbie:数据中心的绿色布线之道
查看>>
ECS之初体验
查看>>
我的友情链接
查看>>
【风云原创】Flash技术将被Html5枪毙,Silverlight将何去何从?
查看>>
power shell测试wmi
查看>>
话里话外:成功CEO的用人之道——按需激励
查看>>
openwrt无线连接互联网的实现原理【1】
查看>>
WPS for Linux(ubuntu)字体配置(字体缺失解决办法)
查看>>
谷歌为Pwnium***竞赛再掷重金 将提供200万美元奖金
查看>>
搭建K8S高可用集群(二进制方式)
查看>>