【Java精品源码栏目提醒】:网学会员--在 Java精品源码编辑为广大网友搜集整理了:nutch 1.2 源代码解析 - 其它资料绩等信息,祝愿广大网友取得需要的信息,参考学习。
Nutch 源代码浅析(一)由两个部分组成: Crawling 和 Searching,Crawling 部分负责抓取网页和建立索引,而Searching 则是将 Crawling 过程中建立的 index 和 segments 根据用户的搜索请求返回结果。
本文分析的是 apache-nutch-1.2。
Nutch 的主流程在 bin/目录下有一个 nutch 的脚本,这个脚本将被安装到安装目录的 bin 目录下,大体作用根据本脚本的路径得到 nutch 的安装目录(其中考虑到了 nutch 是 soft link 的情况),而后JRE 的堆栈大小,还有 classpath 信息。
最关键的代码如下:1. figure out which class to run2. if quotCOMMANDquot quotcrawlquot then3. CLASSorg.apache.nutch.crawl.Crawl4. elif quotCOMMANDquot quotinjectquot then5. CLASSorg.apache.nutch.crawl.Injector6. elif quotCOMMANDquot quotgeneratequot then7. CLASSorg.apache.nutch.crawl.Generator此 段 代 码 则 从 该 脚 本 进 入 了
java 代 码 。
我 们 来 看 看 crawl 是 如 何 工 作 的 。
打 开src/
java/org/apache/nutch/crawl/Crawl.
java ,Crawl 的 main 函数如下:从命令行解析传递进来的参数,可以看到 rootUrlDir 的处理比较特殊,因为其他参数都是配对出现(例如-dir 后面紧跟着的是指明的文件夹路径) ,而 rootUrlDir 则是最后落单的那个参数。
1. for int i 0 i lt args. length i 2. if quot-dirquot.equalsargsi 3. dir new Pathargsi14. i5. else if quot-threadsquot.equalsargsi 6. threads Integer. parseIntargsi17. i8. else if quot-depthquot.equalsargsi 9. depth Integer. parseIntargsi110. i11. else if quot-topNquot.equalsargsi 12. topN Integer. parseIntargsi113. i14. else if quot-solrquot.equalsargsi 15. indexerName quotsolrquot16. solrUrl StringUtils. lowerCaseargsi 117. i18. else if argsi null 19. rootUrlDir new Pathargsi20. 21. 下面就开始进入正题了:1. 首先调用 Injector 的 inject 函数将 rootUrlDir 目录下的所有文件中的网址注入到 crawlDb数据库。
2. 根据指定的抓取深度,循环做以下三件事: 调用 Generator 的 generate 函数,产生 URL 的 fetchlist,并放在一个或者多个 segments中。
调用 Fetcher 的 fetch 函数,获取 segs0所制定的 segement,并 Parsing,parsing 操作可以是集成在了 fetch 这一步,也可以是单独下一步来做。
这儿值得注意的是,只取了第一个seg,而数组中有可能有多个 seg,没有 fetch。
调用 CrawlDb 的 update 函数,将 segments 中的 fetch 和 parsing 的数据合并到数据库(crawlDb)中。
这里取的是 segs 数组,我觉得这儿可能是有问题的。
3. 建立并合并索引。
1. // initialize crawlDb2. injector.injectcrawlDb rootUrlDir3. int i4. for i 0 i lt depth i // generate new segment5. Path segs generator.generatecrawlDb segments -1 topN System6. . currentTimeMillis7. if segs null 8. LOG.infoquotStopping at depthquot i quot - no more URLs to fetch.quot9. break10. 11. fetcher.fetchsegs0 threads org.apache.nutch.fetcher.Fetcher. isParsingconf // fetchit12. if Fetcher.isParsingjob 13. parseSegment.parsesegs0 // parse it if needed14. 15. crawlDbTool.updatecrawlDb segs true true // update crawldb16. 17. if i gt 0 18. linkDbTool.invertlinkDb segments true true false // invert links19. // index dedup amp merge20. FileStatus fstats fs.listStatussegments HadoopFSUtil.getPassDirectoriesFilterfs21. if isSolrIndex 22. SolrIndexer indexer new SolrIndexerconf23. indexer.indexSolrsolrUrl crawlDb linkDb24. Arrays. asListHadoopFSUtil.getPaths fstats25. 26. else 27.28. DeleteDuplicates dedup new DeleteDuplicatesconf29. ifindexes null 30. // Delete old indexes31. if fs.existsindexes 32. LOG.info quotDeleting old indexes: quot indexes33. fs.deleteindexes true34. 35. // Delete old index36. if fs.existsindex 37. LOG.info quotDeleting old merged index: quot index38. fs.deleteindex true39. 40. 41.42. Indexer indexer new Indexerconf43. indexer.indexindexes crawlDb linkDb44. Arrays. asListHadoopFSUtil.getPaths fstats45.46. IndexMerger merger new IndexMergerconf47. ifindexes null 48. dedup.dedup new Path indexes 49. fstats fs.listStatusindexes HadoopFSUtil. getPassDirectoriesFilterfs50. merger.mergeHadoopFSUtil. getPathsfstats index tmpDir51. 52. 53.54. else 55. LOG.warnquotNo URLs to fetch - check your seed list and URL filters.quot56. 57. if LOG.isInfoEnabled LOG.info quotcrawl finished: quot dir Nutch 源代码浅析(二) (Crawl 中 Inject 的工作流程)Inject 操作的入口函数在 org.apache.nutch.crawl.Injector 中,main 函数调用 run 函数,最后进入 inject 函数:1、产生一个以随机数结尾的文件夹 tempDir。
2、运行一个 Hadoop 的 MapRed JOb (sortJob) 将 urls 目录下的所有网址读出, , 并保存到 3、OutputCollectorltTextCrawlDatumgt对象中,而后将该对象序列化到 tempDir 文件夹下。
3、运行另外一个 Hadoop 的 MapRed Job(mergeJob) ,将 tempDir 新产生数据和 crawlDb 已有的老数据合并,合并过程如果有某些网址新旧冲突,将保留已有的老数据,最终结果将保存于 crawlDb 下的一个随机数命名的文件夹 newCrawlDb 中(有别于 1 中的名称) 。
4、重命名 crawlDb 下的“current”文件夹为“old” ,如果“old”文件夹已经存在,将删除之,而后将 newCrawlDb 文件夹重名为quotcurrentquot,删除 tempDir。
public void injectPath crawlDb Path urlDir throws IOException SimpleDateFormat sdf new SimpleDateFormatquotyyyy-MM-dd HH:mm:ssquot long start System.currentTimeMillis if LOG.isInfoEnabled LOG.infoquotInjector: starting at quot sdf.formatstart LOG.infoquotInjector: crawlDb: quot crawlDb LOG.infoquotInjector: urlDir: quot urlDir // 默认会在当前文件夹产生类似于 inject-temp-853856658 的目录,数位为随机数 Path tempDir new PathgetConf.getquotmapred.temp.dirquot quot.quot quot/inject-temp-quot Integer.toStringnew Random.nextIntInteger.MAX_VALUE // map text input file to a lturlCrawlDatumgt file if LOG.isInfoEnabled LOG.infoquotInjector: Converting injected urls to crawl db entries.quot JobConf sortJob new NutchJobgetConf sortJob.setJobNamequotinject quot urlDir // 讲命令行中传入的 urls 目录作为 sortJob 的输入,其后将会读取其中的内容 FileInputFormat.addInputPathsortJob urlDir // InjectMapper 会 根 据 urls 目 录 下 所 有 文 件 的 内 容 , 产 生 OutputCollectorltTextCrawlDatumgt对象。
// 其中 Text 表示 url, CrawlDatum 对象则存放着 url 相关的状态信息, 而 稍后会详加介绍。
// 有两种方法设置 Mapper一中通过 setMapperClass,另一种通过 setMapRunnerClass,后法在 Fetcher 中有使用, // 优点可以以线程启动 sortJob.setMapperClassInjectMapper.class // 将输出的结构存放的 tempDir 目录下 FileOutputFormat.setOutputPathsortJob tempDir // 采用 Sequence 的方式存储文件 sortJob.setOutputFormatSequenceFileOutputFormat.class // 设置 Key 和 Value 的 Class,和 InjectMapper 的 map 输出 OutputCollectorltTextCrawlDatumgt相对应的 // 对所有 Hadoop 中的 Job 都必须符合这一规则。
sortJob.setOutputKeyClassText.class sortJob.setOutputValueClassCrawlDatum.class sortJob.setLongquotinjector.current.timequot System.currentTimeMillis // 调用 Hadoop 中的 MapRed 实现,运行 sortJob JobClient.runJobsortJob // merge with existing crawl db if LOG.isInfoEnabled LOG.infoquotInjector: Merging injected urls into crawl db.quot // 将根据 urls 目录下产生的临时数据 tempDir 合并到 crawlDb 已有的数据中 // mergeJob 的具体设置在 CrawlDb.createJob中实现 JobConf mergeJob CrawlDb.createJobgetConf crawlDb FileInputFormat.addInputPathmergeJob tempDir // 调用 InjectReducer 中的 reduce 函数,实现合并 mergeJob.setReducerClassInjectReducer.class JobClient.runJobmergeJob // 备份 crawlDb 中的老数据,将新的结果替换为新的数据 CrawlDb.installmergeJob crawlDb // clean up // 清楚中间数据 FileSystem fs FileSystem.getgetConf fs.deletetempDir true long end System.currentTimeMillis LOG.infoquotInjector: finished at quot sdf.formatend quot elapsed: quot TimingUtil.elapsedTimestart end而后我们来看看 InjectMapper 是如何实现 map 操作的,会发现,起始 url 可以有两个参数,并且在此函数中引入了 urlfilter 和 scorefilter// key 在此没用到,而 value 中这是从 urls 文件下文件中读出的一行字符串// 输出会追加到 output 中,Text 是 url,而 CrawlDatum 为该 url 想对应的状态等信息public void mapWritableComparable key Text value OutputCollectorltText CrawlDatumgt output Reporter reporter throws IOException String url value.toString // value is line of text if url null ampamp url.trim.startsWithquotquot / Ignore line that start with / return // 在 url 文件中描述一个 url 时,需要以其网址开头,需要以 tab 键隔开,// 目前支持在其后追加两种参数(可同时存在) ,quotnutch.scorequot和quotnutch.fetchIntervalquot,// 这样我们可以根据需要对某个 url 单独设置 score 和 fetchInterval,// 这两个参数将会作为 metadata 存入 CrawlDatum 对象中// if tabs : metadata that could be stored// must be namevalue and separated by /tfloat customScore -1fint customInterval intervalMapltStringStringgt metadata new TreeMapltStringStringgtif url.indexOfquot/tquot-1 String splits url.splitquot/tquot url splits0 for int s1sltsplits.lengths // find separation between name and value int indexEquals splitss.indexOfquotquot if indexEquals-1 // skip anything without a continue String metaname splitss.substring0 indexEquals String metavalue splitss.substringindexEquals1 if metaname.equalsnutchScoreMDName try customScore Float.parseFloatmetavalue catch NumberFormatException nfe else if metaname.equalsnutchFetchIntervalMDName try customInterval Integer.parseIntmetavalue catch NumberFormatException nfe else metadata.putmetanamemetavalue try url urlNormalizers.normalizeurl URLNormalizers.SCOPE_INJECT // 注意,这就是 urlfilter 的入口点,在 inject 到 crawlDb 之前需要经历 urlfilter 的筛选, // 如果不符合筛选条件,将返回 null,对此 url 将不做后续的工作 url filters.filterurl // filter the url catch Exception e if LOG.isWarnEnabled LOG.warnquotSkipping quot urlquot:quote url nullif url null // if it passes value.seturl // collect it CrawlDatum datum new CrawlDatumCrawlDatum.STATUS_INJECTEDcustomInterval datum.setFetchTimecurTime // now add the metadata IteratorltStringgt keysIter metadata.keySet.iterator while keysIter.hasNext String keymd keysIter.next String valuemd metadata.getkeymd datum.getMetaData.putnew Textkeymd new Textvaluemd if customScore -1 datum.setScorecustomScore else datum.setScorescoreInjected try // 注意,这儿是 score filter 的入口点 scfilters.injectedScorevalue datum catch ScoringFilterException e if LOG.isWarnEnabled LOG.warnquotCannot filter injected score for url quot url quot using default quot e.getMessage quotquot // 将结果添加到 output 中 output.collectvalue datum 再看看 InjectReducer 是如何实现 reduce 的,简而言之,新旧合并,新旧冲突时,保留老的// reduce 顾名思义就是了处理一对多的情况,为了处理新老之间的合并而已,// 所以从函数的入参可以看出一些端倪,一个 key,但一组 values,输出依然是// OutputCollectorltText CrawlDatumgtpublic void reduceText key IteratorltCrawlDatumgt values OutputCollectorltText CrawlDatumgt output Reporter reporter throws IOException boolean oldSet false while values.hasNext CrawlDatum val values.next if val.getStatus CrawlDatum.STATUS_INJECTED injected.setval injected.setStatusCrawlDatum.STATUS_DB_UNFETCHED else // 如果老的数据里已经有了这条 url 的记录,将会保留老的 old.setval oldSet true CrawlDatum res null // 以老为大 if oldSet res old // dont overwrite existing value else res injected // 加入到输出中 output.collectkey res至此,Inject 完成。
Nutch 源代码浅析(三) (Crawl 中 generate 的工作流程)generate 所完成的工作:1、numLists 为 numFetchers,Fetcher 的个数,每个 fetch task 对应一个 partition。
如果在 conf文件中把quotmapred.job.trackerquot设置为quotlocalquot的话,将会把 numLists 重写为 1。
2、将 url 分配到不同的 tempDir 中不同的 fetchlist 下,例如quotfetchlist-1quot表示第一个 fetchlist。
3、读取 tempDir 下的以quotfetchlist-quot开头的文件夹,调用 partitionSegment对每一个文件夹对应产生一个 partition。
public Path generatePath dbDir Path segments int numLists long topN long curTime boolean filter boolean norm boolean force int maxNumSegments throws IOException Path tempDir new PathgetConf.getquotmapred.temp.dirquot quot.quot quot/generate-temp-quot System.currentTimeMillis Path lock new PathdbDir CrawlDb.LOCK_NAME FileSystem fs FileSystem.getgetConf // force 强制创建 lock 文件 LockUtil.createLockFilefs lock force SimpleDateFormat sdf new SimpleDateFormatquotyyyy-MM-dd HH:mm:ssquot long start System.currentTimeMillis LOG.infoquotGenerator: starting at quot sdf.formatstart LOG.infoquotGenerator: Selecting best-scoring urls due for fetch.quot LOG.infoquotGenerator: filtering: quot filter LOG.infoquotGenerator: normalizing: quot norm if topN Long.MAX_VALUE LOG.infoquotGenerator: topN: quot topN if quottruequot.equalsgetConf.getGENERATE_MAX_PER_HOST_BY_IP LOG.infoquotGenerator: GENERATE_MAX_PER_HOST_BY_IP will be ignored usepartition.url.mode insteadquot // map to inverted subset due for fetch sort by score JobConf job new NutchJobgetConf job.setJobNamequotgenerate: select from quot dbDir // numLists 为 numFetchers,Fetcher 的个数,每个 fetch task 对应一个 partition if numLists -1 // for politeness make numLists job.getNumMapTasks // a partition per fetch task if quotlocalquot.equalsjob.getquotmapred.job.trackerquot ampamp numLists 1 // override LOG.infoquotGenerator: jobtracker is local generating exactly one partition.quot numLists 1 // 设置 job 的 local 参数job.setLongGENERATOR_CUR_TIME curTime// record real generation timelong generateTime System.currentTimeMillisjob.setLongNutch.GENERATE_TIME_KEY generateTimejob.setLongGENERATOR_TOP_N topNjob.setBooleanGENERATOR_FILTER filterjob.setBooleanGENERATOR_NORMALISE normjob.setIntGENERATOR_MAX_NUM_SEGMENTS maxNumSegments// crawlDb 的 current 作为输入FileInputFormat.addInputPathjob new PathdbDir CrawlDb.CURRENT_NAMEjob.setInputFormatSequenceFileInputFormat.class// 设置 job 的 mapper、partitioner 和 Reducer,都在 Selector 实现了它们所需要的接口job.setMapperClassSelector.classjob.setPartitionerClassSelector.classjob.setReducerClassSelector.class// 输出到 tempDir 中,设置了 key 和 value 的 class,另外加了一个 KeyComparatorClassFileOutputFormat.setOutputPathjob tempDirjob.setOutputFormatSequenceFileOutputFormat.classjob.setOutputKeyClassFloatWritable.classjob.setOutputKeyComparatorClassDecreasingFloatComparator.classjob.setOutputValueClassSelectorEntry.classjob.setOutputFormatGeneratorOutputFormat.classtry JobClient.runJobjob catch IOException e throw e// read the subdirectories generated in the temp// output and turn them into segmentsListltPathgt ge.