SparkonHive還是HiveonSpark?

數據智能相依偎 2024-04-14 02:54:44
我們在前面的文章中說,Hadoop的底層計算引擎MapReduce不是唯一和必須的,其他計算引擎如tez、Spark、storm可以將其替換,以提升計算性能。而在lanbda架構中,Spark又可以作爲批計算引擎,對數據進行集成計算。以至于有很多小夥伴一下子有了疑惑,這兩個Spark到底是啥情況,如何整合,這種架構到底是Hive on Spark還是Spark on Hive?今天我們就來分析一下。 一、用Spark 替換MapReduce——Hive on Spark 相比MapReduce數據計算每次讀寫數據都進行磁盤IO,Spark是一個快速、通用的內存計算系統,它將計算的中間步驟存儲在內存中,每次數據的讀寫都在內存中進行,因此提供了比MapReduce更高效、更方便的數據處理方式。 我們先來看MapReduce程序,一個Mapreduce程序主要包括三部分:Mapper類、Reducer類、執行類。 首先添加依賴: org.apache.hadoop hadoop-client 3.3.0 junit junit 4.12 test org.slf4j slf4j-api 1.7.30 junit junit 3.8.2 junit junit 4.12 compile Mapper類: import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.util.StringUtils;import java.io.IOException;public WordCountMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //如果當前數據不爲空 if (value!=null){ //獲取每一行的數據 String line = value.toString(); //將一行數據根據空格分開// String[] words = line.split(" "); String[] words = StringUtils.split(line,' ');//hadoop的StringUtils.split方法對大數據來說比Java自帶的擁有更好的性能 //輸出鍵值對 for (String word : words) { context.write(new Text(word),new LongWritable(1)); } } }}Reducer類:import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public WordCountReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //累加單詞的數量 long sum = 0; //遍曆單詞計數數組,將值累加到sum中 for (LongWritable value : values) { sum += value.get(); } //輸出每次最終的計數結果 context.write(key,new LongWritable(sum)); }}執行類: import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public WordCountRunner extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(),new WordCountRunner(),args); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCountRunner.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //設置統計文件輸入的路徑,將命令行的第一個參數作爲輸入文件的路徑 //讀取maven項目下resources目錄的文檔 String path = getClass().getResource("/words.txt").getPath(); FileInputFormat.setInputPaths(job,path); //設置結果數據存放路徑,將命令行的第二個參數作爲數據的輸出路徑 //輸出目錄必須不存在!!! FileOutputFormat.setOutputPath(job,new Path("./output")); return job.waitForCompletion(true) ? 0 : 1; }}這個程序基本的操作可以總結爲: map: (K1,V1) ➞ list(K2,V2) reduce: (K2,list(V2)) ➞ list(K3,V3) 第一步是分布式計算,各個分布節點進行節點上的map操作,第二步即是reduce操作,進行計算的彙總。 那,如果用spark去替換這個程序又是怎樣的呢? 添加Maven依賴: org.apache.spark spark-core_2.12 3.1.1 程序: import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import scala.Tuple2; public WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 讀取輸入文件 JavaRDD input = sc.textFile("words.txt"); // 切分爲單詞 JavaRDD words = input.flatMap(s -> Arrays.asList(s.split(" ")).iterator()); // 轉換爲鍵值對 JavaPairRDD pairs = words.mapToPair(word -> new Tuple2<>(word, 1)); // 按鍵進行聚合 JavaPairRDD wordCounts = pairs.reduceByKey((a, b) -> a + b); // 輸出結果 wordCounts.foreach(t -> System.out.println(t._1 + ": " + t._2)); sc.stop(); }} 一句話總結:在Spark中,實際是把數據是通過彈性分布式數據集(RDD)來進行處理的。 二、用Spark 連接Hive——Spark on Hive 在這個操作前,要保證Spark知道MetaStore的IP和端口號,當然MetaStore必須是活躍的。 1.將hive-site.xml拷貝到spark安裝路徑conf目錄 cd /export/server/hive/confcp hive-site.xml /export/server/spark/conf/scp hive-site.xml root@bigdata02:/export/server/spark/conf/scp hive-site.xml root@bigdata3:/export/server/spark/conf/2.將mysql的連接驅動包拷貝到spark的jars目錄下 cd /export/server/hive/libcp mysql-connector-java-5.1.32.jar /export/server/spark/jars/scp mysql-connector-java-5.1.32.jar root@bigdata2:/export/server/spark/jars/scp mysql-connector-java-5.1.32.jar root@bigdata3:/export/server/spark/jars/3.在Hive中開啓MetaStore服務 修改 hive/conf/hive-site.xml新增如下配置,遠程模式部署metastore服務地址。 hive.metastore.uris thrift://master:9083 啓動MetaStore: nohup /export/server/hive/bin/hive --service metastore 2>&1 >> /var/log.log &測試SparkSQL cd /export/server/spark./bin/spark-shell如果這裏測試import org.apache.spark.sql.hive.HiveContext會報錯,說明我們的Spark版本並不能支持hive。這個時候就要重新下載支持的版本或者編譯Spark,讓它支持hive。編譯命令如下: ./dev/make-distribution.sh —tgz —name h27hive -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.1 -Phive -Phive-thriftserver -DskipTests 編譯後,重新安裝spark,啓動shell。 執行sparkSql即可。 三、總結 Hive on Spark 即是將hive查詢依賴的底層計算引擎mapreduce(Hadoop計算引擎)操作替換爲Spark RDD操作,改其爲內存計算模式。 Spark on Hive Spark通過Spark-SQL加載hive的配置文件,獲取到hive的元數據信息來操作Hivesql,操作hive,底層操作與這個無關。
0 阅读:2

數據智能相依偎

簡介:感謝大家的關注