18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1 18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1 18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0 18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0基站B:18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1 18688888888,20160327075100,9F36407EAD0629FC166F14DDE7970F68,1 18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0 18688888888,20160327081300,9F36407EAD0629FC166F14DDE7970F68,0 18688888888,20160327175000,9F36407EAD0629FC166F14DDE7970F68,1 18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1 18688888888,20160327220000,9F36407EAD0629FC166F14DDE7970F68,0 18611132889,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0基站C:18611132889,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1 18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1 18688888888,20160327081900,CC0710CC94ECC657A8561DE549D940E0,0 18611132889,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0 18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1 18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,0 18611132889,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1 18611132889,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0 下面是基站表的数据,共4个字段,分别代表基站id和经纬度以及信号的辐射类型(比如2G信号、3G信号和4G信号):9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6 CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6 16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6 基于以上3个基站的日志数据,要求计算某个手机号码在一天之内出现最多的两个地点。因为一个手机号码可能一天当中可能会经过很多的基站,可能他在家停留了10个小时,在公司停留了8个小时,还有可能坐车的时候路过了一些基站。思路: 求每个手机号码在哪些基站下面停留的时间最长,在计算的时候,用"手机号码+基站"才能定位在哪个基站下面停留的时间, 因为每个基站下面会有很多的用户的日志数据。全国有很多的基站,每个电信分公司只负责计算自己的数据。数据存放在基站下面的机房的服务器上。一般是用过一些工具通过网络把这些数据搜集过来。搜集过来的数据量可能会很大,这些数据一般会存放到分布式的文件系统中,比如存放到HDFS中。我们可能会基于一周或者一个月的数据量来计算,时间跨度越大,计算出来的结构就越精确。相关资料在"Spark资料"中。重要:写好的spark程序,如果我不想每次都提交到spark集群上面运行,可以在程序中指定"在本地运行模式",也就是如下方式:new SparkConf().setAppName("xxxx").setMaster("local")它表示要在本地模拟一个程序来运行,它并没有把它提交到集群。但是,这种方式在linux和Mac系统中没有问题,而在Windows下会有异常。因为我们的spark程序要从hdfs中读数据,所以它要用到hadoop的InputFormat来读数据,如果要在windows下面进行本地调试,需要做一些事情。我们知道hadoop要压缩和解压缩,那么压缩和解压缩所需要的都是一些c或c++编写的库,而这些c或c++编写的库文件是不跨平台的。所以要在windows下面调试就必须先把这些库安装好。我们建议在linux下面进行调试,如果你没有Mac系统的话,可以在linux虚拟机上安装一个idea开发工具?使用Linux的图形界面来调试。下面是完整的代码:import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object MobileLocation { def main(args: Array[String]) { val conf = new SparkConf().setAppName("MobileLocation").setMaster("local[2]") val sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile(args(0)) //切分 //lines.map(_.split(",")).map(arr => (arr(0), arr(1).toLong, arr(2), args(3))) val splited = lines.map(line => { val fields = line.split(",") val mobile = fields(0) val lac = fields(2) val tp = fields(3) val time = if(tp == "1") -fields(1).toLong else fields(1).toLong //拼接数据 ((mobile, lac), time) }) //分组聚合 val reduced : RDD[((String, String), Long)] = splited.reduceByKey(_+_) val lmt = reduced.map(x => { //(基站,(手机号, 时间)) (x._1._2, (x._1._1, x._2)) }) //连接 val lacInfo: RDD[String] = sc.textFile(args(1)) //整理基站数据 val splitedLacInfo = lacInfo.map(line => { val fields = line.split(",") val id = fields(0) val x = fields(1) val y = fields(2) (id, (x, y)) }) //连接jion val joined: RDD[(String, ((String, Long), (String, String)))] = lmt.join(splitedLacInfo) PRintln(joined.collect().toBuffer) sc.stop() }}
新闻热点
疑难解答