博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Twenty Newsgroups Classification任务之二seq2sparse(2)
阅读量:5747 次
发布时间:2019-06-18

本文共 5652 字,大约阅读时间需要 18 分钟。

接上篇,SequenceFileTokenizerMapper的输出文件在////tokenized-documents/part-m-00000文件即可查看,同时可以编写下面的代码来读取该文件(该代码是根据前面读出聚类中心点文件改编的),如下:

 

package mahout.fansy.test.bayes.read;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Writable;import org.apache.mahout.common.StringTuple;import org.apache.mahout.common.iterator.sequencefile.PathFilters;import org.apache.mahout.common.iterator.sequencefile.PathType;import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;public class ReadFromTokenizedDocuments {	/**	 * @param args	 */	private static Configuration conf;		public static void main(String[] args) {		conf=new Configuration();		conf.set("mapred.job.tracker", "ubuntu:9001");		String path="hdfs://ubuntu:9000/home/mahout/mahout-work-mahout0/20news-vectors/tokenized-documents/part-m-00000";				getValue(path,conf);	}		 /**     * 把序列文件读入到一个变量中;     * @param path 序列文件     * @param conf  Configuration     * @return  序列文件读取的变量     */    public static List
getValue(String path,Configuration conf){ Path hdfsPath=new Path(path); List
list = new ArrayList
(); for (Writable value : new SequenceFileDirValueIterable
(hdfsPath, PathType.LIST, PathFilters.partFilter(), conf)) { Class
valueClass = value.getClass(); if (valueClass.equals(StringTuple.class)) { StringTuple st = (StringTuple) value; list.add(st); } else { throw new IllegalStateException("Bad value class: " + valueClass); } } return list; }}

通过上面的文件可以读取到第一个StringTuple的单词个数有1320个(去掉stop words的单词数);

 

然后就又是一堆参数的设置,一直到267行,判断processIdf是否为非true,因为前面设置的是tfdif,所以这里进入else代码块,如下:

 

if (!processIdf) {        DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath, outputDir, tfDirName, conf, minSupport, maxNGramSize,          minLLRValue, norm, logNormalize, reduceTasks, chunkSize, sequentialAccessOutput, namedVectors);      } else {        DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath, outputDir, tfDirName, conf, minSupport, maxNGramSize,          minLLRValue, -1.0f, false, reduceTasks, chunkSize, sequentialAccessOutput, namedVectors);      }

这里直接调用DictionaryVectorizer的createTermFrequencyVectors方法,进入该方法(DictionaryVectorizer的145行),可以看到首先也是一些参数的设置,然后就到了startWordCounting方法了,进入这个方法可以看到这个是一个Job的基本设置,其Mapper、Combiner、Reducer分别为:TermCountMapper、TermCountCombiner、TermCountReducer,下面分别来看各个部分的作用(其实和最基本的wordcount很相似):

 

TermCountMapper,首先贴代码:

 

protected void map(Text key, StringTuple value, final Context context) throws IOException, InterruptedException {    OpenObjectLongHashMap
wordCount = new OpenObjectLongHashMap
(); for (String word : value.getEntries()) { if (wordCount.containsKey(word)) { wordCount.put(word, wordCount.get(word) + 1); } else { wordCount.put(word, 1); } } wordCount.forEachPair(new ObjectLongProcedure
() { @Override public boolean apply(String first, long second) { try { context.write(new Text(first), new LongWritable(second)); } catch (IOException e) { context.getCounter("Exception", "Output IO Exception").increment(1); } catch (InterruptedException e) { context.getCounter("Exception", "Interrupted Exception").increment(1); } return true; } });

该部分代码首先定义了一个Mahout开发人员定义的Map类,然后遍历value中的各个单词(比如第一个value中有1320个单词);当遇到map中没有的单词就把其加入map中,否则把map中该单词的数量加1更新原来的单词的数量,即for循环里面做的事情;然后就是forEachPair方法了,这里应该是复写了该方法?好像是直接新建了一个类然后把这个新建的类作为forEachPair的参数;直接看context.write吧,应该是把wordCount(这个变量含有每个单词和它的计数)中的各个单词和单词计数分别作为key和value输出;

 

然后是TermCountCombiner和TermCountReducer,这两个代码一样的和当初学习Hadoop入门的第一个例子是一样的,这里就不多说了。查看log信息,可以看到reduce一共输出93563个单词。

然后就到了createDictionaryChunks函数了,进入到DictionaryVectorizer的215行中的该方法:

 

List
chunkPaths = Lists.newArrayList(); Configuration conf = new Configuration(baseConf); FileSystem fs = FileSystem.get(wordCountPath.toUri(), conf); long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L; int chunkIndex = 0; Path chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex); chunkPaths.add(chunkPath); SequenceFile.Writer dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class); try { long currentChunkSize = 0; Path filesPattern = new Path(wordCountPath, OUTPUT_FILES_PATTERN); int i = 0; for (Pair
record : new SequenceFileDirIterable
(filesPattern, PathType.GLOB, null, null, true, conf)) { if (currentChunkSize > chunkSizeLimit) { Closeables.closeQuietly(dictWriter); chunkIndex++; chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex); chunkPaths.add(chunkPath); dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class); currentChunkSize = 0; } Writable key = record.getFirst(); int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8; currentChunkSize += fieldSize; dictWriter.append(key, new IntWritable(i++)); } maxTermDimension[0] = i; } finally { Closeables.closeQuietly(dictWriter); }

这里看到新建了一个Writer,然后遍历该文件的key和value,但是只读取key值,即单词,然后把这些单词进行编码,即第一个单词用0和它对应,第二个单词用1和它对应。

 

上面代码使用的dictWriter查看变量并没有看到哪个属性是存储单词和对应id的,所以这里的写入文件的机制是append就写入?还是我没有找到正确的属性?待查。。。

 

分享,快乐,成长

转载请注明出处: 

 

你可能感兴趣的文章
4-学会刷Wi-Fi模块固件(刷AT指令固件)
查看>>
ASP.NET Core 2 学习笔记(五)静态文件
查看>>
Button 使用Command 按钮置灰未更新
查看>>
PostgreSQL控制台以竖行显示
查看>>
CSS-Flex 布局教程:实例篇
查看>>
Java和移动端交互
查看>>
Java SSM 客户管理 商户 管理系统 库存管理 销售报表 项目源码
查看>>
排序优化——如何实现一个通用的、高性能的排序函数
查看>>
IDEA中css文件包红色下划线
查看>>
js中bind、call、apply函数的用法
查看>>
OC高效率52之多用GCD,少用performSelector系列方法
查看>>
sqoop导入关系型数据库-解密Sqoop
查看>>
语音分享应用ios源码项目
查看>>
性能测试结果分析
查看>>
Linux-dns基础知识和BIND的简单配置-1
查看>>
kafka Corrupt index found
查看>>
PoE
查看>>
《JAVA与模式》之适配器模式
查看>>
采用spring+structs+hibanate框架开发javaWeb项目
查看>>
数据库和表的管理
查看>>