spark/bin/spark-submit /--master spark://hadoop01:7077,spark://hadoop02:7077 /--executor-memory 512m --total-executor-cores 7 /--class cn.itcast.spark.WordCount //root/spark-1.0-SNAPSHOT.jar /hdfs://hadoop01:9000/wc /hdfs://hadoop01:9000/wc/out后面会给大家介绍如何在IDEA中既可以编译java程序,又可以编译scala程序,这就需要两个插件。其实在打包的时候,不用在pom文件制定main方法的全类名,因为我们可能会写很多的程序,我们可以在它执行的时候,动态的告诉它调哪个main方法。---使用java来开发一个简单spark的wordcount程序----------------------------------------------------------------import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavasparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Int;import scala.Tuple2;import java.util.Arrays;/** * Created by SYJ on 2016/10/22. */public class JavaWordCount { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); JavaSparkContext context = new JavaSparkContext(sparkConf); JavaRDD<String> lines = context.textFile(args[0]); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); JavaPairRDD<Integer, String> swapedPair = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception { return new Tuple2<Integer, String>(tp._2, tp._1); } }); JavaPairRDD<String, Integer> finalResult = swapedPair.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception { return tp.swap(); } }); finalResult.saveAsTextFile(args[1]); context.stop(); }}
新闻热点
疑难解答