摘要:本文主要向大家介绍机器学习入门之Spark学习 & 机器学习了,通过具体的内容向大家展现,希望对大家学习机器学习入门有所帮助。
本文主要向大家介绍机器学习入门之Spark学习 & 机器学习了,通过具体的内容向大家展现,希望对大家学习机器学习入门有所帮助。
启动Spark Shell
发现有的参数比如--executor-memory不能用,所以直接:
$ ./bin/spark-shell --master spark://10.117.146.12:7077
得到以下输出:
然后在 dashboard 也能够看到任务信息:
3.3 转换与操作
3.3.1 并行化集合例子演示
在该例子中通过parallelize方法定义了一个从1~10的数据集,然后通过map(_*2)对数据集每个数乘以2,接着通过filter(_%3==0)过滤被3整除的数字,最后使用toDebugString显示RDD的LineAge,并通过collect计算出最终的结果。
val num=sc.parallelize(1 to 10)
val doublenum = num.map(_*2)
val threenum = doublenum.filter(_ % 3 == 0)
threenum.toDebugString
threenum.collect
在下图运行结果图中,我们可以看到RDD的LineAge演变,通过paralelize方法建立了一个ParalleCollectionRDD,使用map()方法后该RDD为MappedRDD,接着使用filter()方法后转变为FilteredRDD。
但是我的spark-shell运行失败,不能成功。我觉得是我机器的shell版本太低导致的。
因为用下面的命令是成功的:
bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999
所以决定用scala打包成jar来运行。需要再Intellij里面使用scala插件。在欢迎页面的configure里面选择plugin, 点击左下角的"Install Jetbrain plugin",然后在搜索框输入scala,能够看到scala插件。
但是有时候网络不好,或者fuck GFW的原因。需要先下下来,然后点最右边的 "Install plugin from disk"... Fuck GFW.
这时候,注意版本一定要合适,可以参考看到的scala插件图里面的版本。
安装好之后,新建项目的地方,就有Scala的选项了:
SBT应该是Scala的一个构建工具。 我们建项目的时候,先不选择SBT吧。
建好之后,出来了项目目录结构:
然后建一个package: com.spark.my,然后建一个文件(因为没找到scala class)Hello.scala。Intellij自动检测出没有配置Scala库。这时候,就要下一个scala的库。//www.scala-lang.org/download/ 这里下载了 scala-2.12.0.tgz,解压到 Data/Work/Installed里面。然后再Intellij的项目里面,配置scala类库到刚刚解压目录里面的lib目录即可。
然后再Hello.scala里面写代码:
package com.spark.my
object Hello {
def main(args: Array[String]): Unit = {
println("hello world")
}
}
运行,会稍微慢一点,出结果:
hello world
Process finished with exit code 0
然后改成Spark里面要用到的。需要先导入jar包。
File->Project Structure->Libraries->点绿色‘+‘->java->找到spark-assembly-1.0.0-hadoop2.2.0.jar->OK
按照类似的方法导入scala-compiler.jar, scala-library.jar, scala-reflect.jar 这些位于scala的安装目录下的lib目录。
但是发现Spark 2以上,不再包含 spark-assembly 这个jar了。也不知道依赖关系会多复杂,所以还是用maven来处理吧。
新建了一个maven项目,把刚刚的代码拷贝了一遍,是可以的。
另外,发现scala要从object入口,从class入口是不行的。
其中pom.xml内容:
<project xmlns="//maven.apache.org/POM/4.0.0"
xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="//maven.apache.org/POM/4.0.0 //maven.apache.org/xsd/maven-4.0.0.xsd">
目录结构:
Count.scala内容如下:
package com.spark.my
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by baidu on 16/11/28.
*/
object Count {
def main(args: Array[String]): Unit = {
//println("hello");
val conf = new SparkConf()
val sc = new SparkContext(conf)
val num=sc.parallelize(1 to 10)
val doublenum = num.map(_*2)
val threenum = doublenum.filter(_ % 3 == 0)
threenum.toDebugString
threenum.collect
}
}
配置artifacts
File->Project Structure->Artifacts->点绿色‘+‘->jar->From modules ...->在Main Classes中输入HdfsWC->OK
如下图所示将Extracted xxxx选中,点红色‘-‘,将这些移除->OK
然后发现报错,找不到xxx类。所以只好把所有的jar都打包进去(同时确保了jar包的版本都正确)。打包好是一个90多M的jar包,拷贝到机器上。
同时加上一个打印语句
println("threenum: %s".format(threenum.toDebugString))
加在了 threenum.collect 前面。看看效果。
结果:可以打印:
threenum: (2) MapPartitionsRDD[2] at filter at Count.scala:19 []
| MapPartitionsRDD[1] at map at Count.scala:17 []
| ParallelCollectionRDD[0] at parallelize at Count.scala:15 []
然后再加对输出数组的打印:
threenum.collect
println("threenum size=" + threenum.count())
for (elem <- threenum) {
print(elem + ",")
}
println("all elements done.")
输出结果:
16/11/28 03:51:21 INFO scheduler.DAGScheduler: Job 1 finished: count at Count.scala:25, took 0.038559 s
threenum size=3
result:6
result:12
result:18
all elements done.
16/11/28 03:51:21 INFO spark.SparkContext: Invoking stop() from shutdown hook
以下语句和collect一样,都会触发作业运行
num.reduce (_ + _)
num.take(5)
num.first
num.count
num.take(5).foreach(println)
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标人工智能机器学习频道!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号