Hadoopas & „Mapreduce“ pavyzdžiai: sukurkite pirmąją programą „Java“

Turinys:

Anonim

Šioje pamokoje sužinosite naudoti „Hadoop“ su „MapReduce“ pavyzdžiais. Naudojami įvesties duomenys yra SalesJan2009.csv. Joje pateikiama su pardavimais susijusi informacija, pvz., Produkto pavadinimas, kaina, mokėjimo būdas, miestas, kliento šalis ir kt. Tikslas yra sužinoti kiekvienoje šalyje parduotų produktų skaičių.

Šioje pamokoje sužinosite

  • Pirmoji „Hadoop MapReduce“ programa
  • „SalesMapper“ klasės paaiškinimas
  • „SalesCountryReducer“ klasės paaiškinimas
  • „SalesCountryDriver“ klasės paaiškinimas

Pirmoji „Hadoop MapReduce“ programa

Dabar šioje „MapReduce“ mokymo programoje sukursime savo pirmąją „Java MapReduce“ programą:

2009 m. Sausio mėn. Duomenys

Įsitikinkite, kad įdiegėte „Hadoop“. Prieš pradėdami nuo faktinio proceso, pakeiskite vartotoją į „hduser“ (ID naudojamas naudojant „Hadoop“ konfigūraciją, galite pereiti į „userid“, naudojamą per „Hadoop“ programavimo konfigūraciją).

su - hduser_

1 žingsnis)

Žemiau pateiktame MapReduce pavyzdyje sukurkite naują katalogą su pavadinimu MapReduceTutorial kaip shwon

sudo mkdir MapReduceTutorial

Suteikite leidimus

sudo chmod -R 777 MapReduceTutorial

„SalesMapper.java“

package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper  {private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector  output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}

„SalesCountryReducer.java“

package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer {public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}

„SalesCountryDriver.java“

package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}

Atsisiųskite failus čia

Patikrinkite visų šių failų leidimus

ir jei trūksta „skaitymo“ leidimų, suteikite tuos pačius

2 žingsnis)

Eksportuoti klasės kelią, kaip parodyta toliau pateiktame „Hadoop“ pavyzdyje

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

3 žingsnis)

Sudarykite „Java“ failus (šie failai yra kataloge „ Final-MapReduceHandsOn“ ). Jos klasės failai bus dedami į paketų katalogą

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Šio įspėjimo galima saugiai nepaisyti.

Šis kompiliavimas sukurs katalogą dabartiniame kataloge, pavadintą paketo pavadinimu, nurodytu „Java“ šaltinio faile (ty mūsų atveju „ SalesCountry “) ir įdės į jį visus sukompiliuotus klasės failus.

4 žingsnis)

Sukurkite naują failą Manifest.txt

sudo gedit Manifest.txt

prie jo pridėti šias eilutes,

Main-Class: SalesCountry.SalesCountryDriver

„SalesCountry.SalesCountryDriver“ yra pagrindinės klasės pavadinimas. Atminkite, kad šios eilutės pabaigoje turite paspausti klavišą Enter.

5 žingsnis)

Sukurkite „Jar“ failą

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Patikrinkite, ar sukurtas stiklainio failas

6 žingsnis)

Paleiskite „Hadoop“

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

7 žingsnis)

Nukopijuokite failą SalesJan2009.csv į ~ / inputMapReduce

Dabar naudokite žemiau esančią komandą, norėdami nukopijuoti ~ / inputMapReduce į HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Mes galime saugiai ignoruoti šį įspėjimą.

Patikrinkite, ar failas iš tikrųjų nukopijuotas, ar ne.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

8 žingsnis)

Paleiskite „MapReduce“ užduotį

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Tai sukurs išvesties katalogą pavadinimu mapreduce_output_sales HDFS. Šio katalogo turinys bus failas, kuriame bus parduodami produktai kiekvienoje šalyje.

9 žingsnis)

Rezultatas gali būti matomas per komandos sąsają,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Rezultatus taip pat galima pamatyti per interneto sąsają

Atidarykite r žiniatinklio naršyklėje.

Dabar pasirinkite „Naršyti failų sistemą“ ir eikite į / mapreduce_output_sales

Atidarykite „ r-00000“ dalį

„SalesMapper“ klasės paaiškinimas

Šiame skyriuje suprasime „ SalesMapper“ klasės įgyvendinimą .

1. Pirmiausia nurodome paketo pavadinimą savo klasei. „SalesCountry“ yra mūsų paketo pavadinimas. Atkreipkite dėmesį, kad kompiliacijos išvestis „SalesMapper.class “ pateks į katalogą, pavadintą šiuo paketo pavadinimu: „ SalesCountry“ .

Po to mes importuojame bibliotekos paketus.

Žemiau fotografiją parodytas įgyvendinimas SalesMapper klasėje

Kodo pavyzdys Paaiškinimas:

1. „SalesMapper“ klasės apibrėžimas-

viešosios klasės „SalesMapper“ praplečia „MapReduceBase“ įrankius „Mapper“ {

Kiekviena žemėlapių klasė turi būti išplėsta iš MapReduceBase klasės ir turi įdiegti Mapper sąsają.

2. „Žemėlapio“ funkcijos apibrėžimas

public void map(LongWritable key,Text value,OutputCollector output,Reporter reporter) throws IOException

Pagrindinė „Mapper“ klasės dalis yra „map ()“ metodas, kuris priima keturis argumentus.

Kiekvieną kartą paskambinus į „map ()“ metodą, perduodama raktų ir verčių pora ( „raktas“ ir „reikšmė“ šiame kode).

„map ()“ metodas prasideda dalijant įvesties tekstą, kuris gaunamas kaip argumentas. Šias eilutes į žodžius suskirsto naudodamas žymeklį.

String valueString = value.toString();String[] SingleCountryData = valueString.split(",");

Čia kaip atribiklis naudojamas „,“ .

Po to suformuojama pora, naudojant įrašą 7-ame masyvo „SingleCountryData“ indekse ir reikšmę „1“ .

output.collect (naujas tekstas („SingleCountryData“ [7]), vienas);

Mes renkamės įrašą 7-ajame indekse, nes mums reikalingi šalies duomenys ir jis yra 7-ame masyvo „SingleCountryData“ rodyklėje .

Atkreipkite dėmesį, kad mūsų įvesties duomenys pateikiami žemiau pateiktu formatu (kur šalis yra 7 -ajame indekse, o pradinis indeksas yra 0) -

Sandorio data, produktas, kaina, mokėjimo_Type, vardas, miestas, valstija, šalis , paskyra sukurta, paskutinis prisijungimas, platuma, ilguma

Mapper išvestis vėlgi yra raktų ir verčių pora, kuri išvedama naudojant „OutputCollector“ metodą „collect () .

„SalesCountryReducer“ klasės paaiškinimas

Šiame skyriuje suprasime „ SalesCountryReducer“ klasės įgyvendinimą .

1. Pirmiausia nurodome paketo pavadinimą savo klasei. „SalesCountry“ yra ne paketo pavadinimas. Atkreipkite dėmesį, kad kompiliacijos išvestis „SalesCountryReducer.class “ pateks į katalogą, pavadintą šiuo paketo pavadinimu: „ SalesCountry“ .

Po to mes importuojame bibliotekos paketus.

Žemiau fotografiją parodytas įgyvendinimas SalesCountryReducer klasėje

Kodo paaiškinimas:

1. SalesCountryReducer klasės apibrėžimas-

public class „SalesCountryReducer“ praplečia „MapReduceBase“ įrankius „Reducer“ {

Pirmieji du duomenų tipai „Tekstas“ ir „IntWritable“ yra reduktoriaus įvesties rakto vertės duomenų tipas.

Žemėlapio rodyklės forma yra , . Ši žemėlapio išvestis tampa įvestimi reduktoriui. Taigi, norint suderinti duomenų tipą, čia kaip duomenų tipas naudojamas „ Text“ ir „ IntWritable“ .

Paskutiniai du duomenų tipai, „Tekstas“ ir „IntWritable“, yra reduktoriaus generuojamas išvesties duomenų tipas raktų ir reikšmės poros pavidalu.

Kiekviena reduktoriaus klasė turi būti išplėsta iš MapReduceBase klasės ir ji turi įdiegti Reducer sąsają.

2. „Sumažinti“ funkcijos apibrėžimas

public void reduce( Text t_key,Iterator values,OutputCollector output,Reporter reporter) throws IOException {

Metodo reduc () įvestis yra raktas su kelių verčių sąrašu.

Pavyzdžiui, mūsų atveju bus

, , , , , .

Tai suteikiama reduktoriui kaip

Taigi, norint priimti šios formos argumentus, naudojami pirmieji du duomenų tipai, ty tekstas ir iteratorius . Tekstas yra raktų duomenų tipas, o „ Iterator“ - duomenų tipas, nurodantis to rakto reikšmių sąrašą.

Kitas argumentas yra OutputCollector tipo, kuris renka reduktoriaus fazės išvestį.

metodas sumažinti () prasideda nukopijuojant rakto vertę ir inicijuojant dažnio skaičių iki 0.

Teksto raktas = t_key; int frequencyForCountry = 0;

Tada, naudodami kilpą „ while“ , kartojame reikšmių, susijusių su raktu, sąrašą ir apskaičiuojame galutinį dažnį, susumuodami visas reikšmes.

 while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}

Dabar mes išstumiame rezultatą į išvesties kolektorių rakto ir gauto dažnio skaičiaus pavidalu .

Žemiau kodas tai daro-

output.collect(key, new IntWritable(frequencyForCountry));

„SalesCountryDriver“ klasės paaiškinimas

Šiame skyriuje suprasime „ SalesCountryDriver“ klasės įgyvendinimą

1. Pirmiausia nurodome paketo pavadinimą savo klasei. „SalesCountry“ yra ne paketo pavadinimas. Atkreipkite dėmesį, kad kompiliacijos išvestis „SalesCountryDriver.class “ pateks į katalogą, pavadintą šiuo paketo pavadinimu: „ SalesCountry“ .

Čia yra eilutė, nurodanti paketo pavadinimą, po kurio kodas importuojamas bibliotekos paketus.

2. Apibrėžkite vairuotojo klasę, kuri sukurs naują kliento darbą, konfigūracijos objektą, ir reklamuosite „Mapper“ ir „Reducer“ klases.

Vairuotojų klasė yra atsakinga už tai, kad mūsų „MapReduce“ darbas būtų rodomas „Hadoop“. Šioje klasėje mes nurodome darbo pavadinimą, įvesties / išvesties duomenų tipą, žemėlapių ir reduktorių klasių pavadinimus .

3. Žemiau esančiame kodo fragmente nustatome įvesties ir išvesties katalogus, kurie naudojami atitinkamai įvedant duomenų rinkinį ir gaminant išvestį.

arg [0] ir arg [1] yra komandinės eilutės argumentai, perduoti naudojant komandą, pateiktą „MapReduce“ rankose, ty

$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales

4. Sukelkite mūsų darbą

Žemiau kodo pradėkite vykdyti „MapReduce“ užduotį

try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}