第十周,大數(shù)據(jù)建模學(xué)習(xí)應(yīng)用于模具行業(yè)的培訓(xùn)班
早上, 很早到書城, 今天最后一天培訓(xùn), 還是在 SPARK 復(fù)習(xí)。
開(kāi)發(fā) SPARK 使用 scala 、java 、 python 、 api 以及 shell 語(yǔ)言開(kāi)發(fā)的 搜索引擎。
缺點(diǎn): 不適合 web服務(wù) , dao層 、web爬蟲
優(yōu)點(diǎn): 伯克利數(shù)據(jù)分析 生態(tài)圈 , 機(jī)器學(xué)習(xí) , 數(shù)據(jù)挖掘, 數(shù)據(jù)庫(kù), 信息檢索, 自然語(yǔ)言處理, 語(yǔ)音識(shí)別
以 spark core 為核心 , 從 hdfs , amazon s3 hbase 等持久讀取數(shù)據(jù)
SPARK SQL 基于內(nèi)存的,
查看數(shù)據(jù)庫(kù)
這個(gè)在前端也能看到,點(diǎn)擊進(jìn)去, 詳細(xì)說(shuō)明。
HIVE 不支持事物的, 增刪改查。 ACRD 使用
在那一臺(tái)機(jī)器完成的, 可以看看
提交的人物, 由系統(tǒng)分配節(jié)點(diǎn)去跑。
HIVE 主要做 數(shù)據(jù)倉(cāng)庫(kù) DW ,
插入數(shù)據(jù)完成, 做一下 查詢數(shù)據(jù)
將 HIVE 的內(nèi)核, 改成 SPARK 以后, 提升速度 80倍
上面是 HOVE 的插入和查詢的過(guò)程。
接著跑 SPARK SQL 的過(guò)程
所有的 數(shù)據(jù)庫(kù)和表, 都是存儲(chǔ)在 HDFS 上的。
現(xiàn)在要用 SPARK SQL 來(lái)執(zhí)行。
show database
use
select
這個(gè)是 SPARK的管理頁(yè)面
DAG 的有向性的結(jié)合圖
DAG Scheduler
RDD 經(jīng)過(guò) 4個(gè) 聚合步驟后, 形成一個(gè) DAG , 多個(gè) DAG 組合編程 DAG計(jì)劃表
下面是 scala 寫的
要統(tǒng)計(jì)下面文件中的個(gè)數(shù)
上述雖然是 一行代碼, 但是也顯示的淋漓盡致
運(yùn)行的狀況
管理頁(yè)面的結(jié)果
環(huán)境配置
SPARK 單詞統(tǒng)計(jì)的 DEMO , 接下來(lái)再看看
SPARK STREM 消費(fèi)kaFka 的數(shù)據(jù), 管理者確定是 ZooKeeper
下午講解 Klin 做一些報(bào)表的功能, SPARK
要搭建不同的集群, 由原來(lái)的 store 的程序, 遷移到 spark 上來(lái), Flink 是把交互式查詢和實(shí)時(shí)計(jì)算合在一起了。
Flink 也是用 Scala 來(lái)寫。
1、客戶端提交作業(yè)后,啟動(dòng)Driver,Driver是Spark作業(yè)的Master(也就是通過(guò)Driver來(lái)啟動(dòng)Receiver,定時(shí)去啟動(dòng)任務(wù)的處理,
注意的是,驅(qū)動(dòng)啟動(dòng)任務(wù)會(huì)受前一個(gè)任務(wù)執(zhí)行的影響。也就是前一個(gè)任務(wù)沒(méi)有執(zhí)行完成后,是不會(huì)啟動(dòng)后邊的任務(wù)的。
所以,注意你的streaming的執(zhí)行時(shí)間,絕對(duì)不要超過(guò)Recive數(shù)據(jù)的時(shí)間)
2、每個(gè)作業(yè)包含多個(gè)Executor,每個(gè)Executor以線程的方式運(yùn)行task,Spark Streaming至少包含一個(gè)Receiver task。
(一個(gè)Executor就是一個(gè)spark進(jìn)程,在yarn中就是一個(gè)container,這個(gè)大家應(yīng)該知道。
然后Receiver task是在driver中創(chuàng)建的,我理解一個(gè)Receiver是運(yùn)行在一個(gè)Executor中的。
然后如果想要?jiǎng)?chuàng)建多個(gè)Receiver,那么需要大概這樣做(1 to 10).map(_.createStream…),這樣就能創(chuàng)建10個(gè)receiver task啦。
注意這個(gè)數(shù)量當(dāng)然不能超過(guò)你的結(jié)點(diǎn)數(shù)量啦。
還有個(gè)問(wèn)題,通常使用kafka比較合適,因?yàn)閗afka是stream向kafka來(lái)poll數(shù)據(jù)。
而他媽的flume默認(rèn)只支持pull,如果想支持poll,那需要定制sink,那真是太惡心了。)
3、Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報(bào)給Driver,然后備份到另外一個(gè)Executor上。
(默認(rèn)情況下接受數(shù)據(jù)是200毫秒生成一個(gè)block,我理解一個(gè)block應(yīng)該是一個(gè)partition?
這個(gè)還不確定,需要對(duì)照源代碼看一下;
然后會(huì)把生成的Block隨機(jī)扔到不同的Executor,同時(shí),driver去派發(fā)任務(wù)時(shí),也會(huì)找到就近的Executor。
我理解,節(jié)點(diǎn)中的所有executor都應(yīng)該會(huì)有數(shù)據(jù)才對(duì))
4、ReceiverTracker維護(hù)Receiver匯報(bào)的BlockId。
(這個(gè)ReceiverTracker應(yīng)該是維護(hù)在Driver中,Driver會(huì)根據(jù)維護(hù)的這些數(shù)據(jù)塊進(jìn)行任務(wù)的派發(fā))
5、Driver定時(shí)生成JobGenerator,根據(jù)DStream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler。
6、JobScheduler負(fù)責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,
每個(gè)stage包含一到多個(gè)task。(我記得DAGScheduler會(huì)對(duì)任務(wù)做一層優(yōu)化)
7、TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護(hù)task的運(yùn)行狀態(tài)。
8、當(dāng)tasks,stages,jobset完成后,單個(gè)batch(批處理)才算完成。
具體流程:
客戶端提交作業(yè)后啟動(dòng)Driver,Driver是spark作業(yè)的Master。
每個(gè)作業(yè)包含多個(gè)Executor,每個(gè)Executor以線程的方式運(yùn)行task,Spark Streaming至少包含一個(gè)receiver task。
Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報(bào)給Driver,然后備份到另外一個(gè)Executor上。
ReceiverTracker維護(hù)Reciver匯報(bào)的BlockId。
Driver定時(shí)啟動(dòng)JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler。
JobScheduler負(fù)責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個(gè)stage包含一到多個(gè)task。
TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護(hù)task的運(yùn)行狀態(tài)。
當(dāng)tasks,stages,jobset完成后,單個(gè)batch才算完成。
--------------------------------------------------- 第十周 盧老師 講課筆記
Spark Streaming原理:
Spark Streaming是基于spark的流式批處理引擎,其基本原理是把輸入的數(shù)據(jù)以某一時(shí)間間隔批量的處理,當(dāng)批處理縮短到秒級(jí)時(shí),便可以用于處理實(shí)時(shí)數(shù)據(jù)流。
kafka
flume HDFS
hdfs -------> spark streaming --------> DataBase
zeromq | Dashboards
|
twitter |
|
input data streaming -->sparkstreaming-->batches of input data--->spark engine --->batches of processed data
live input stream --> spark Streaming --->t t t --->spark job --->results
sparkstreaming架構(gòu)
spark streaming作業(yè)流程:
Driver:
receivertracker
jobGenerator
JobScheduler
DagScheduler
TaskScheduler
BlockManagerMaster
Executor:
Receiver
BlockManagerSlave
Executor:
Task
BlockManagerSlave
運(yùn)行機(jī)制:
1、客戶端提交作業(yè)后啟動(dòng)一個(gè)Driver,Driver是spark作業(yè)的Master。
2、每個(gè)作業(yè)包含多個(gè)Executor,每個(gè)Executor以線程的方式運(yùn)行Task,Spark Streaing至少包含一個(gè)receiver task。
3、Receiver接收數(shù)據(jù)后生成Block,并把Blockid匯報(bào)給Driver,然后備份到另外一個(gè)Executor上
4、ReceiverTracker維護(hù)Recivver匯報(bào)的BlockId。
5、Driver定時(shí)啟動(dòng)JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建JobSet,交給JobScheduler.
6、JobScheduler負(fù)責(zé)調(diào)度JobSet,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個(gè)Stages包含一個(gè)到多個(gè)Task。
7、TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護(hù)task的運(yùn)行狀態(tài)。
8、當(dāng)tasks、stages、jobset完成后,單個(gè)batch才算完成。
Spark Streaming消費(fèi)Kafka數(shù)據(jù):
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
Spark統(tǒng)計(jì)單詞次數(shù):
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
SparkSQL數(shù)據(jù)查詢:
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
val url =
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
val df = sqlContext
.read
.format("jdbc")
.option("url", url)
.option("dbtable", "people")
.load()
// Looks the schema of this DataFrame.
df.printSchema()
// Counts people by age
val countsByAge = df.groupBy("age").count()
countsByAge.show()
// Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
Spark機(jī)器學(xué)習(xí)實(shí)現(xiàn)邏輯回歸算法:
// Every record of this DataFrame contains the label and
// features represented by a vector.
val df = sqlContext.createDataFrame(data).toDF("label", "features")
// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
val lr = new LogisticRegression().setMaxIter(10)
// Fit the model to the data.
val model = lr.fit(df)
// Inspect the model: get the feature weights.
val weights = model.weights
// Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
-------------------------------------------------------------------------------------------------------
最后做一次復(fù)習(xí) , 有始有終。
-------在最下面增加 2行代碼。
如果有 幾千臺(tái)服務(wù)器, 必須配置免登錄, 生成的密鑰, 追加到這個(gè)文件中去。
hadoop 的官方網(wǎng)站, 有非常詳細(xì)的內(nèi)容。
多種方式都可以查看是不是啟動(dòng)成功
50070端口, HDFS的端口
搭建完成后,
下面是 Yarn 的配置信息
需要 shuffle 的, 需要增加 各類 shuffle .
FIFO FAIR 先進(jìn)先出 公平調(diào)度
上面是 Yarn 的配置文件, 這是第三周講的
第四周講 Hbase , 是 google 的大表實(shí)現(xiàn)
單機(jī)版和集群版的不同
第5周 、 第六周
一切階是對(duì)像
以上是 5周 Scala