=====Einen eigenen ersten MapReduce Job erstellen===== ==== Einleitung==== * Ein MapReduce Job verdichtet die Daten Schritt für Schritt * Die Verarbeitung erfolgt Listen orientiert * Sehr gut für die Parallelisierung geeignet * Ursprung in der funktionalen Programmierung (Funktion map() und fold() bzw reduce()) Zwei Haupt Phasen: * Map * Liest Key/Value Paare ein und gibt Key/Value Paare je nach Bedarf wieder aus ( wie z.b. sortiert) * map (input) -> list(intermediate_value) * Reduce * Verarbeitet die Key/Value Paare und giebt ebenfalls Key/Value Paare wieder aus * reduce (out_key, list(intermediate_value)) -> list(out_value) === Ein erstes Beispiel === Übersicht - eine Liste aller in den Texten vorkommenden Auto Hersteller erstellen: {{ :hadoop:mapreduce_v01.png?400 |Übersicht MapReduce}} ==== Einen eigenen MapReduce in Java für Hadoop entwickeln==== === Übersicht über die generelle Architektur === Im Beispiel sollen die Buchstaben in den Daten gezählt werden: {{ :hadoop:mapreduce_hadoop_v01.png?500 |MapReduce mit Hadoop}} Die Elemente im einzelnen: **Client**: * Der Client Driver konfiguriert den Job und sendet Job Anfrage an das Cluster * Über das Interface "JobConf" wird der Job konfiguriert **InputFormater**: * Der Mapper Prozesse verwendet einen InputFormater um die Daten zu lesen **Mapper**: * Der Mapper ließt die Daten in Key/Value Format ein * Pro Zeile (je nach InputFormater ) wird die map Methode einmal aufgerufen * Im Default ist der Key der Offset der Zeile (Zeilennummer) * Der Mapper erstellt eine Liste mit Output Values im Key/Value Format * Mapper speicher Daten lokal und werden vom Reducer dann abgeholt * Konfigurierbar ab wievel % bereits vor Abschluss Daten bereitgestellt werden sollen * Der eigene Mapper wird von der Klasse **"Mapper"** abgeleitet * Pro split/Block einer Datei wird eine Mapper Instance gestartet **Shuffle-And-Sort**: * Zuständig für das Sortieren der Ergebnisse der einzelnen Mapper als Input für den Reducer * Ein Teil wird im Mapper,ein Teil im Reducer durchgeführt **Combiner**: * Läuft auf dem selben Knoten wie der Mapper * Dient zur Verdichtung von Ergebnissen des Mappers * Wird aber nur bei Bedarf verwendet * Kann gleiche Klasse wie der Reducer sein falls (nur bei distributiven Funktionen wie A=B+C B=A+C) **Partitioner**: * Legt fest wie die Daten auf die Reducer verteilt werden sollen **Sort**: * Vor der Verarbeitung durch den Reducer werden die Daten sortiert **Reducer**: * Verarbeitet die Zwischenergebnisse * Erzeugt das Endergebnis * Die eigene Reducer Klasse wird von der Klasse **"Reducer"** abgeleitet **OutputFormat**: * Das OutputFormat legt fest wie der Reducer die Daten im HDFS ablegt * Default: Tab Format === Implementierung === Mit Hadoop 2 hat sich die API geändert, es muss daher das Package **"org.apache.hadoop.mapreduce"** verwendet werden. == Umgebung einrichten === Ziel ist es den Job mit dem JDeveloper auf einem Windows7 Rechner zu erstellen. \\ Größte Herausforderung ist dabei eine laufähige Umgebung unter Win7 einzurichten. \\ => [[nosql:hadoop_connect_from_windows|Mit MS Windows Clients mit Hadoop arbeiten]] Für den lokalen Test kann ist die Implementierung des "hadoop Tool interfaces" notwendig! Beispiel siehe => http://hadoopi.wordpress.com/2013/06/05/hadoop-implementing-the-tool-interface-for-mapreduce-driver/ hadoop jar -fs file:/// -jt local == Die Aufgabe === Es soll das in Oracle typische Beispiel, wie viele Mitarbeiter arbeiten in welcher Abteilung umgesetzt werden: select count(*), deptno from emp / Zur Zeit allerdings erstmal das Sum Beispiel implementiert um einen ersten einfachen Test zumzuseten. Die Daten werden im dem typischem emp format als CSV Liste im HDFS abgelegt. Die klassischen Testdaten erzeugen: set linesize 1000 set trimspool on set pagesize 0 set feedback off spool /tmp/emp.csv select empno || ':' || ename || ':' || job || ':' || mgr || ':' || to_char(hiredate,'dd.mm.yyyy')|| ':' || sal || ':' || comm || ':' || deptno from scott.emp order by ename / spool off CSV auf das HDFS kopieren hdfs dfs -put /tmp/emp.csv /user/gpipperr hdfs dfs -cat /user/gpipperr/emp.csv 7499:ALLEN:SALESMAN:7698:20.02.1981:1600:300:30 7782:CLARK:MANAGER:7839:09.06.1981:2450::10 7566:JONES:MANAGER:7839:02.04.1981:2975::20 7839:KING:PRESIDENT::17.11.1981:5000::10 7654:MARTIN:SALESMAN:7698:28.09.1981:1250:1400:30 7934:MILLER:CLERK:7782:23.01.1982:1300::10 7844:TURNER:SALESMAN:7698:08.09.1981:1500:0:30 7521:WARD:SALESMAN:7698:22.02.1981:1250:500:30 == Die Mapper Klasse == package gpi.hadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // input Key - input Value - output Key - output Value public class DeptCountMapper extends Mapper { static IntWritable oneValue = new IntWritable(1); @Override // input Key - input Value - output Value public void map(Object key, Text value, Context contex) throws IOException, InterruptedException { /* input: 0 1 2 3 4 5 6 7 7876:ADAMS:CLERK:7788:12.01.1983:1100:300:20 Split in Key/Values pairs 20,1 */ // read on row String[] emprow = value.toString().split(":"); String deptno = emprow[7]; contex.write(new Text(deptno), oneValue); } } == Die Reducer Klasse == package gpi.hadoop; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class DeptCountReducer extends Reducer { private IntWritable totalWordCount = new IntWritable(); @Override public void reduce(Text deptno, Iterable counts, Context context) throws IOException, InterruptedException { //logic /* 30,1,1,1, 50,1,1 Result should look like this 30 3 60 2 */ int deptcount = 0; for (IntWritable count : counts) { deptcount += 1; } context.write(deptno, new IntWritable(deptcount)); } } == Driver == package gpi.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class DeptCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 3) { System.err.println("Usage: DeptCount [input] [output]"); System.exit(2); } else { System.out.println("Call DeptCount with Parameter 1::"+args[1]+" Parameter 2::"+args[2]); } // create a new Configuration Job job = Job.getInstance(conf); job.setJobName(args[0]); // Mapper job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(DeptCountMapper.class); // Reducer job.setReducerClass(DeptCountReducer.class); job.setOutputFormatClass(TextOutputFormat.class); // Input and Output Path to the data FileInputFormat.setInputPaths(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); //main driver Class job.setJarByClass(DeptCountT.class); //set Output Class job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.submit(); } } Beispiel für die Verwendung des Tool Interfaces für die Startklasse: package gpi.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DeptCountT extends Configured implements Tool { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DeptCountT(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { //get Config object Configuration conf = this.getConf(); // create a new Configuration Job job = Job.getInstance(conf); job.setJobName(args[0]); // Mapper job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(DeptCountMapper.class); // Reducer job.setReducerClass(DeptCountReducer.class); job.setOutputFormatClass(TextOutputFormat.class); // Input and Output Path to the data FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //main driver Class job.setJarByClass(DeptCount.class); //set Output Class job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // Execute job and return status return job.waitForCompletion(true) ? 0 : 1; } } Jar file erzeugen und auf den Hadoop Server kopieren => [[https://blogs.oracle.com/bwb/resource/Jdev_jar_deployments/Creating_Jar_Deployments_with_JDeveloper.html|Jar File im Oracle JDeveloper]] Jar File aufrufen: yarn jar GpiHadoopExamples.jar gpi.hadoop.DeptCount /user/gpipperr/emp.csv /user/gpipperr/empRun1 yarn application -list hdfs dfs -cat /user/gpipperr/empRun1/part-r-00000 10 3 20 1 30 4 **Hinweise** Wird mit **yarn jar "** der Job aufgerufen, ist der Parameter 1 der Klassenamen, der Parameter 2 das in Verzeichnis und der Parameter 3 das out Verzeichnis. D.h. passt aber nicht zu der Verwendung der Eingabe Parameter in den verbreiteten Dokumentationen und Beispielen. Fehler: istsException: Output directory hdfs://quickstart.cloudera:8020/user/gpipperr/emp.csv already exists Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://quickstart.cloudera:8020/user/gpipperr/emp.csv already exists Nur wenn in der Jar Datei die Main Klasse definiert wird und ein Aufruf nach diesem Muster durchgeführt wird, passt das im Detail: **yarn jar "** Ansonsten muss bei Fehler mit den Aufrufparamentern (Typischerweise "Output directory xxx already exists" ) der Index der Parameter im Code entsprechend angepasst werden. === Weiteres Beispiel=== => [[nosql:oracle_nosql_hadoop_integration|Die Oracle NoSQL per MapReduce mit Hadoop verwenden]] ==== Quellen ==== Gute weitere Beispiele: * http://wiki.apache.org/hadoop/WordCount * http://stevekrenzel.com/finding-friends-with-mapreduce * http://www.javacodegeeks.com/2013/07/mapreduce-algorithms-understanding-data-joins-part-1.html MapAndReduce Patterns: * http://highlyscalable.wordpress.com/2012/02/01/mapreduce-patterns/ * http://courses.cs.washington.edu/courses/cse490h/08au/lectures/MapReduceDesignPatterns-UW2.pdf * http://www.ccs.neu.edu/home/mirek/classes/2012-F-CS6240/Slides/4-DesignPatterns.pdf * http://chandramanitiwary.wordpress.com/2012/08/18/mapreduce-secondary-sort/ Dell Zhang * http://www.dcs.bbk.ac.uk/~dell/teaching/cc/ * http://www.dcs.bbk.ac.uk/~dell/teaching/cc/book/ditp/ditp_ch3.pdf * https://github.com/lintool/MapReduce-course-2013s Buch: * MapReduce cookbook