###windows下链接hadoop集群
1、假如在linux机器上已经搭建好hadoop集群
2、在windows上把hadoop的压缩包解压到一个没有空格的目录下,比如是D盘根目录
3、配置环境变量 HADOOP_HOME=D:hadoop-2.7.7 Path下添加 %HADOOP_HOME%bin
4、下载相似版本的文件 hadoop.dll #存放在C:WindowsSystem32 目录下 winutils.exe #存放在%HADOOP_HOME%bin 目录下
#下载地址: https://github.com/steveloughran/winutils
5、wordcount import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/** * @author: LUGH1 * @date: 2019-4-8 * @description: */ public class WordCount { public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.88.130:9000"); Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class);
job.setMapperClass(WdMapper.class); job.setReducerClass(WdReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job,new Path("/test/word.txt")); FileOutputFormat.setOutputPath(job,new Path("/test/output"));
boolean result = job.waitForCompletion(true); System.exit(result?0:1);
System.out.println("good job"); } }
class WdMapper extends Mapper<Object,Text,IntWritable> { @Override protected void map(Object key,Text value,Context context) throws IOException,InterruptedException { String line = value.toString(); String[] split = line.split(" "); for(String word : split){ context.write(new Text(word),new IntWritable(1)); } } }
class WdReducer extends Reducer<Text,IntWritable,IntWritable> { @Override protected void reduce(Text key,Iterable<IntWritable> values,InterruptedException { int count = 0; for(IntWritable i : values){ count += i.get(); } context.write(key,new IntWritable(count)); } }
###windows下链接spark集群运行 主要设置: 1、配置master的地址:conf.setMaster("spark://192.168.88.130:7077") 2、配置jar包的位置:conf.setJars(List("hdfs://192.168.88.130:9000/test/sparkT-1.0-SNAPSHOT.jar")) 如上的sparkT-1.0-SNAPSHOT.jar包是通过idea打包然后通过hadoop fs -put上传在hdfs上的
#代码 import org.apache.spark.{SparkConf,SparkContext}
object sparkTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("test").setMaster("spark://192.168.88.130:7077") // conf.set("spark.driver.host","192.168.88.1") conf.setJars(List("hdfs://192.168.88.130:9000/test/sparkT-1.0-SNAPSHOT.jar")) val sc = new SparkContext(conf) // val path = "E:java_producttest.txt" val rdd = sc.textFile("hdfs://192.168.88.130:9000/test/word.txt") // val rdd = sc.textFile("E:java_producttest.txt") val count = rdd.flatMap(line=>line.split(" ")).map(x=>(x,1)).reduceByKey(_+_) count.collect().foreach(println) //.saveAsTextFile("hdfs://192.168.88.130:9000/test/wordoupt1") }
}
? (编辑:北几岛)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|