使用 MapReduce 读取 XML 文件 XML( 可扩展标记语言, 英语 :extensible Markup Language, 简称 : XML) 是一种标记语言, 也是行业标准数据交换交换格式, 它很适合在系统之间进行数据存储和交换 ( 话说 Hadoop H ive 等的配置文件就是 XML 格式的 ) 本文将介绍如何使用 MapReduce 来读取 XML 文件 但是 Had oop 内部是无法直接解析 XML 文件 ; 而且 XML 格式中没有同步标记, 所以并行地处理单个 XML 文件比较棘手 本文将一一介绍如何使用 MapReduce 处理 XML 文件, 并且能够对其进行分割, 然后并行处理这些分片 MapReduce 里面读取文件一般都是使用 InputFormat 类进行的, 比如读取文本文件可以使用 TextInputFormat 类进行 ( 关于 InputFormat 类的源码分析可以参见 MapReduce 数据输入中 InputFormat 类源码解析 ), 所以我们在 MapReduce 中解析 XML 文件也可以自定义类似的 XMLInputFormat 熟悉 Mahout 的同学应该知道, 它里面实现了一个 XmlInputFormat(https://github.com/apache/mahout/blo b/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/maho ut/classifier/bayes/xmlinputformat.java) 类, 我们可以指定指定的开始和结束标记来分隔 XML 文件, 然后我们就可以像解析 Text 文件一样来解析 Xml 文件, 如下 : Configuration conf = new Configuration(); conf.set("xmlinput.start", "<property>"); conf.set("xmlinput.end", "</property>"); Job job = new Job(conf); job.setinputformatclass(xmlinputformat.class); 指定了 xmlinput.start 和 xmlinput.end 就可以搜索 XML 文件中的开始和结束标记, 从而获取到 XML 里面的数据 完整的 map 程序如下 : import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.*; import org.slf4j.*; import javax.xml.stream.*; import java.io.*; 1 / 5
public static class Map extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(longwritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String document = value.tostring(); System.out.println("'" + document + "'"); try { XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new ByteArrayInputStream(document.getBytes())); String propertyname = ""; String propertyvalue = ""; String currentelement = ""; while (reader.hasnext()) { int code = reader.next(); switch (code) { case START_ELEMENT: currentelement = reader.getlocalname(); case CHARACTERS: if (currentelement.equalsignorecase("name")) { propertyname += reader.gettext(); else if (currentelement.equalsignorecase("value")) { propertyvalue += reader.gettext(); reader.close(); context.write(propertyname.trim(), propertyvalue.trim()); catch (Exception e) { log.error("error processing '" + document + "'", e); map 接收一个 Text 的值, 里面的值包含了开始和结束的标记之间的数据, 然后我们就可以使用 jav a 内置的 XML Streaming API 解析器提取每个属性的 key 和 value, 最后输出 key 和 value 完整的代码如下 : package com.iteblog.hadoop; 2 / 5
import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.*; import org.slf4j.*; import javax.xml.stream.*; import java.io.*; import static javax.xml.stream.xmlstreamconstants.*; public final class XML{ private static final Logger log = LoggerFactory.getLogger (HadoopPropertyXMLMapReduce.class); public static class Map extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(longwritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String document = value.tostring(); System.out.println("'" + document + "'"); try { XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new ByteArrayInputStream(document.getBytes())); String propertyname = ""; String propertyvalue = ""; String currentelement = ""; while (reader.hasnext()) { int code = reader.next(); switch (code) { case START_ELEMENT: currentelement = reader.getlocalname(); case CHARACTERS: if (currentelement.equalsignorecase("name")) { propertyname += reader.gettext(); else if (currentelement.equalsignorecase("value")) { propertyvalue += reader.gettext(); 3 / 5
reader.close(); context.write(propertyname.trim(), propertyvalue.trim()); catch (Exception e) { log.error("error processing '" + document + "'", e); public static void main(string... args) throws Exception { runjob(args[0], args[1]); public static void runjob(string input, String output) throws Exception { Configuration conf = new Configuration(); conf.set("key.value.separator.in.input.line", " "); conf.set("xmlinput.start", "<property>"); conf.set("xmlinput.end", "</property>"); Job job = new Job(conf); job.setjarbyclass(xml.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); job.setmapperclass(map.class); job.setinputformatclass(xmlinputformat.class); job.setnumreducetasks(0); job.setoutputformatclass(textoutputformat.class); FileInputFormat.setInputPaths(job, new Path(input)); Path outpath = new Path(output); FileOutputFormat.setOutputPath(job, outpath); outpath.getfilesystem(conf).delete(outpath, true); job.waitforcompletion(true); 我们可以使用 hadoop 内部的 core-site.xml 文件做测试, 如下 : $ bin/hadoop jar xml.jar com.iteblog.hadoop.xml input/core-site.xml output/ 4 / 5
Powered by TCPDF (www.tcpdf.org) 使用 MapReduce 读取 XML 文件 运行完上面的程序, 可以在 output 目录下产生几个 part* 的文件, 这说明解析 XML 文件的程序起作用了 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 5 / 5