首页 > 学院 > 开发设计 > 正文

跟天齐老师学Spark(8)--Spark RDD综合练习

2019-11-11 05:19:22
字体:
来源:转载
供稿:网友
综合练习:通过基站信息计算家庭地址和工作地址需求:根据手机信号来计算其所在的位置手机一开机,就会和附近的基站建立连接,建立连接和断开连接都会被记录到服务器上的日志,所以即使没手机有开启网络或者GPS,也可以定位手机所在的位置。基站都有一定的辐射范围,并且根据信号强度有不同的信号级别,比如2G、3G和4G信号。我们虽然不知道手机用户所在的具体位置,但是我们知道基站的位置,手机用户一旦进入基站的辐射范围,手机就会和基站之间建立连接。我们就可以计算用户大致的位置。我们就可以根据这些位置信息做一些推荐广告。比如附近的商家,你可能喜欢的商品或者服务。假如现在我们得到了一些位置数据,比如有手机号、建立连接的标记(比如1)、断开连接的标记(比如0)、建立连接的时间戳、断开连接的时间戳等字段。用断开连接的时间减去建立连接的时间就是用户在该基站下停留的时间。但是这种计算方式不是很好,因为在实际中用户可能会停留好几天的情况,或者说有建立连接但是没有断开连接的情况。所以这里面其实还会有一个会话的概念。其实基站不是一直保持连接的,它可能每隔一段时间他会自动断开一次。比如每隔一天就断开一次。每个基站都有一个基站ID,这是一个UUID。所以可能还会一个和基站相关的基站表,比如基站的id和经纬度等信息。我们应该将两个表进行join才能得到用户在基站下停留的时间等信息。这里我们先不考虑会话id的概念。我们这里只是求某个用户白天和晚上等某个时间段停留时间的从高到低进行排序。比如早晨8点到晚上6点之间停留时间最长的我们可以认为是用户的工作地点。相反,在晚上6点到第二天早上8点这段时间中停留时间最长的我们就认为是用户的住所。知道了用户的工作地点和住处,我们就可以做一些推荐了。但是还有一个问题是,一个用户可能在一天中会经过几十甚至上百个基站。我们怎么才能知道它在哪个基站下面停留的时间最长呢?而且还有一个问题,一个用户在同一个基站下路过还不止一次。比如某用户,在他公司和家之间有一个基站,他早上上班时路过某基站一次,中午回家又路过一次,晚上下班又路过一次。这样,他就会在同一个基站中路过很多次。这样在基站的服务器日志中就会记录很多条数据。我们现在要计算用户在哪个基站下停留时间最长,其实就是简单的数据切分,然后进行求和,然后进行join。为了便于理解,我们模拟了一些简单的日志数据,共4个字段:手机号码,时间戳,基站id,连接类型(1表示建立连接,0表示断开连接):基站A:
    18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1    18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1    18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0    18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0基站B:
    18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1    18688888888,20160327075100,9F36407EAD0629FC166F14DDE7970F68,1    18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0    18688888888,20160327081300,9F36407EAD0629FC166F14DDE7970F68,0    18688888888,20160327175000,9F36407EAD0629FC166F14DDE7970F68,1    18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1    18688888888,20160327220000,9F36407EAD0629FC166F14DDE7970F68,0    18611132889,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0基站C:
    18611132889,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1    18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1    18688888888,20160327081900,CC0710CC94ECC657A8561DE549D940E0,0    18611132889,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0    18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1    18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,0    18611132889,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1    18611132889,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0    下面是基站表的数据,共4个字段,分别代表基站id和经纬度以及信号的辐射类型(比如2G信号、3G信号和4G信号):
    9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6    CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6    16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6    基于以上3个基站的日志数据,要求计算某个手机号码在一天之内出现最多的两个地点。因为一个手机号码可能一天当中可能会经过很多的基站,可能他在家停留了10个小时,在公司停留了8个小时,还有可能坐车的时候路过了一些基站。思路:    求每个手机号码在哪些基站下面停留的时间最长,在计算的时候,用"手机号码+基站"才能定位在哪个基站下面停留的时间,    因为每个基站下面会有很多的用户的日志数据。全国有很多的基站,每个电信分公司只负责计算自己的数据。数据存放在基站下面的机房的服务器上。一般是用过一些工具通过网络把这些数据搜集过来。搜集过来的数据量可能会很大,这些数据一般会存放到分布式的文件系统中,比如存放到HDFS中。我们可能会基于一周或者一个月的数据量来计算,时间跨度越大,计算出来的结构就越精确。相关资料在"Spark资料"中。重要:写好的spark程序,如果我不想每次都提交到spark集群上面运行,可以在程序中指定"在本地运行模式",也就是如下方式:new SparkConf().setAppName("xxxx").setMaster("local")它表示要在本地模拟一个程序来运行,它并没有把它提交到集群。但是,这种方式在linux和Mac系统中没有问题,而在Windows下会有异常。因为我们的spark程序要从hdfs中读数据,所以它要用到hadoop的InputFormat来读数据,如果要在windows下面进行本地调试,需要做一些事情。我们知道hadoop要压缩和解压缩,那么压缩和解压缩所需要的都是一些c或c++编写的库,而这些c或c++编写的库文件是不跨平台的。所以要在windows下面调试就必须先把这些库安装好。我们建议在linux下面进行调试,如果你没有Mac系统的话,可以在linux虚拟机上安装一个idea开发工具?使用Linux的图形界面来调试。下面是完整的代码:
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object MobileLocation {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("MobileLocation").setMaster("local[2]")    val sc = new SparkContext(conf)    val lines: RDD[String] = sc.textFile(args(0))    //切分    //lines.map(_.split(",")).map(arr => (arr(0), arr(1).toLong, arr(2), args(3)))    val splited = lines.map(line => {      val fields = line.split(",")      val mobile = fields(0)      val lac = fields(2)      val tp = fields(3)      val time = if(tp == "1") -fields(1).toLong else fields(1).toLong      //拼接数据      ((mobile, lac), time)    })    //分组聚合    val reduced : RDD[((String, String), Long)] = splited.reduceByKey(_+_)    val lmt = reduced.map(x => {      //(基站,(手机号, 时间))      (x._1._2, (x._1._1, x._2))    })    //连接    val lacInfo: RDD[String] = sc.textFile(args(1))    //整理基站数据    val splitedLacInfo = lacInfo.map(line => {      val fields = line.split(",")      val id = fields(0)      val x = fields(1)      val y = fields(2)      (id, (x, y))    })    //连接jion    val joined: RDD[(String, ((String, Long), (String, String)))] = lmt.join(splitedLacInfo)    PRintln(joined.collect().toBuffer)    sc.stop()  }}
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表