首页 > 学院 > 开发设计 > 正文

跟天齐老师学Spark(5)--使用IDEA开发Spark程序

2019-11-11 03:18:00
字体:
来源:转载
供稿:网友
使用IDEA开发spark程序:补全的快捷键:"Ctrl+Alt+v"下面是提交spark程序到spark集群上运行的命令:
spark/bin/spark-submit /--master spark://hadoop01:7077,spark://hadoop02:7077 /--executor-memory 512m --total-executor-cores 7 /--class cn.itcast.spark.WordCount //root/spark-1.0-SNAPSHOT.jar /hdfs://hadoop01:9000/wc /hdfs://hadoop01:9000/wc/out后面会给大家介绍如何在IDEA中既可以编译java程序,又可以编译scala程序,这就需要两个插件。其实在打包的时候,不用在pom文件制定main方法的全类名,因为我们可能会写很多的程序,我们可以在它执行的时候,动态的告诉它调哪个main方法。---使用java来开发一个简单spark的wordcount程序----------------------------------------------------------------
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavasparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Int;import scala.Tuple2;import java.util.Arrays;/** * Created by SYJ on 2016/10/22. */public class JavaWordCount {    public static void main(String[] args) {        SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");        JavaSparkContext context = new JavaSparkContext(sparkConf);        JavaRDD<String> lines = context.textFile(args[0]);        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {            @Override            public Iterable<String> call(String line) throws Exception {                return Arrays.asList(line.split(" "));            }        });        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {            @Override            public Tuple2<String, Integer> call(String word) throws Exception {                return new Tuple2<String, Integer>(word, 1);            }        });        JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {            @Override            public Integer call(Integer i1, Integer i2) throws Exception {                return i1 + i2;            }        });        JavaPairRDD<Integer, String> swapedPair = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {            @Override            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {                return new Tuple2<Integer, String>(tp._2, tp._1);            }        });        JavaPairRDD<String, Integer> finalResult = swapedPair.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {            @Override            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {                return tp.swap();            }        });        finalResult.saveAsTextFile(args[1]);        context.stop();    }}
上一篇:spring中jar包依赖

下一篇:活动选择

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表