Šioje pamokoje sužinosite naudoti „Hadoop“ su „MapReduce“ pavyzdžiais. Naudojami įvesties duomenys yra SalesJan2009.csv. Joje pateikiama su pardavimais susijusi informacija, pvz., Produkto pavadinimas, kaina, mokėjimo būdas, miestas, kliento šalis ir kt. Tikslas yra sužinoti kiekvienoje šalyje parduotų produktų skaičių.
Šioje pamokoje sužinosite
- Pirmoji „Hadoop MapReduce“ programa
- „SalesMapper“ klasės paaiškinimas
- „SalesCountryReducer“ klasės paaiškinimas
- „SalesCountryDriver“ klasės paaiškinimas
Pirmoji „Hadoop MapReduce“ programa
Dabar šioje „MapReduce“ mokymo programoje sukursime savo pirmąją „Java MapReduce“ programą:
Įsitikinkite, kad įdiegėte „Hadoop“. Prieš pradėdami nuo faktinio proceso, pakeiskite vartotoją į „hduser“ (ID naudojamas naudojant „Hadoop“ konfigūraciją, galite pereiti į „userid“, naudojamą per „Hadoop“ programavimo konfigūraciją).
su - hduser_
1 žingsnis)
Žemiau pateiktame MapReduce pavyzdyje sukurkite naują katalogą su pavadinimu MapReduceTutorial kaip shwon
sudo mkdir MapReduceTutorial
Suteikite leidimus
sudo chmod -R 777 MapReduceTutorial
„SalesMapper.java“
package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper{private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}
„SalesCountryReducer.java“
package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer{public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}
„SalesCountryDriver.java“
package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}
Atsisiųskite failus čia
Patikrinkite visų šių failų leidimus
ir jei trūksta „skaitymo“ leidimų, suteikite tuos pačius
2 žingsnis)
Eksportuoti klasės kelią, kaip parodyta toliau pateiktame „Hadoop“ pavyzdyje
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
3 žingsnis)
Sudarykite „Java“ failus (šie failai yra kataloge „ Final-MapReduceHandsOn“ ). Jos klasės failai bus dedami į paketų katalogą
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Šio įspėjimo galima saugiai nepaisyti.
Šis kompiliavimas sukurs katalogą dabartiniame kataloge, pavadintą paketo pavadinimu, nurodytu „Java“ šaltinio faile (ty mūsų atveju „ SalesCountry “) ir įdės į jį visus sukompiliuotus klasės failus.
4 žingsnis)
Sukurkite naują failą Manifest.txt
sudo gedit Manifest.txt
prie jo pridėti šias eilutes,
Main-Class: SalesCountry.SalesCountryDriver
„SalesCountry.SalesCountryDriver“ yra pagrindinės klasės pavadinimas. Atminkite, kad šios eilutės pabaigoje turite paspausti klavišą Enter.
5 žingsnis)
Sukurkite „Jar“ failą
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Patikrinkite, ar sukurtas stiklainio failas
6 žingsnis)
Paleiskite „Hadoop“
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
7 žingsnis)
Nukopijuokite failą SalesJan2009.csv į ~ / inputMapReduce
Dabar naudokite žemiau esančią komandą, norėdami nukopijuoti ~ / inputMapReduce į HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Mes galime saugiai ignoruoti šį įspėjimą.
Patikrinkite, ar failas iš tikrųjų nukopijuotas, ar ne.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
8 žingsnis)
Paleiskite „MapReduce“ užduotį
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Tai sukurs išvesties katalogą pavadinimu mapreduce_output_sales HDFS. Šio katalogo turinys bus failas, kuriame bus parduodami produktai kiekvienoje šalyje.
9 žingsnis)
Rezultatas gali būti matomas per komandos sąsają,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Rezultatus taip pat galima pamatyti per interneto sąsają
Atidarykite r žiniatinklio naršyklėje.
Dabar pasirinkite „Naršyti failų sistemą“ ir eikite į / mapreduce_output_sales
Atidarykite „ r-00000“ dalį
„SalesMapper“ klasės paaiškinimas
Šiame skyriuje suprasime „ SalesMapper“ klasės įgyvendinimą .
1. Pirmiausia nurodome paketo pavadinimą savo klasei. „SalesCountry“ yra mūsų paketo pavadinimas. Atkreipkite dėmesį, kad kompiliacijos išvestis „SalesMapper.class “ pateks į katalogą, pavadintą šiuo paketo pavadinimu: „ SalesCountry“ .
Po to mes importuojame bibliotekos paketus.
Žemiau fotografiją parodytas įgyvendinimas SalesMapper klasėje
Kodo pavyzdys Paaiškinimas:
1. „SalesMapper“ klasės apibrėžimas-
viešosios klasės „SalesMapper“ praplečia „MapReduceBase“ įrankius „Mapper“
Kiekviena žemėlapių klasė turi būti išplėsta iš MapReduceBase klasės ir turi įdiegti Mapper sąsają.
2. „Žemėlapio“ funkcijos apibrėžimas
public void map(LongWritable key,Text value,OutputCollectoroutput,Reporter reporter) throws IOException
Pagrindinė „Mapper“ klasės dalis yra „map ()“ metodas, kuris priima keturis argumentus.
Kiekvieną kartą paskambinus į „map ()“ metodą, perduodama raktų ir verčių pora ( „raktas“ ir „reikšmė“ šiame kode).
„map ()“ metodas prasideda dalijant įvesties tekstą, kuris gaunamas kaip argumentas. Šias eilutes į žodžius suskirsto naudodamas žymeklį.
String valueString = value.toString();String[] SingleCountryData = valueString.split(",");
Čia kaip atribiklis naudojamas „,“ .
Po to suformuojama pora, naudojant įrašą 7-ame masyvo „SingleCountryData“ indekse ir reikšmę „1“ .
output.collect (naujas tekstas („SingleCountryData“ [7]), vienas);
Mes renkamės įrašą 7-ajame indekse, nes mums reikalingi šalies duomenys ir jis yra 7-ame masyvo „SingleCountryData“ rodyklėje .
Atkreipkite dėmesį, kad mūsų įvesties duomenys pateikiami žemiau pateiktu formatu (kur šalis yra 7 -ajame indekse, o pradinis indeksas yra 0) -
Sandorio data, produktas, kaina, mokėjimo_Type, vardas, miestas, valstija, šalis , paskyra sukurta, paskutinis prisijungimas, platuma, ilguma
Mapper išvestis vėlgi yra raktų ir verčių pora, kuri išvedama naudojant „OutputCollector“ metodą „collect () “ .
„SalesCountryReducer“ klasės paaiškinimas
Šiame skyriuje suprasime „ SalesCountryReducer“ klasės įgyvendinimą .
1. Pirmiausia nurodome paketo pavadinimą savo klasei. „SalesCountry“ yra ne paketo pavadinimas. Atkreipkite dėmesį, kad kompiliacijos išvestis „SalesCountryReducer.class “ pateks į katalogą, pavadintą šiuo paketo pavadinimu: „ SalesCountry“ .
Po to mes importuojame bibliotekos paketus.
Žemiau fotografiją parodytas įgyvendinimas SalesCountryReducer klasėje
Kodo paaiškinimas:
1. SalesCountryReducer klasės apibrėžimas-
public class „SalesCountryReducer“ praplečia „MapReduceBase“ įrankius „Reducer“
Pirmieji du duomenų tipai „Tekstas“ ir „IntWritable“ yra reduktoriaus įvesties rakto vertės duomenų tipas.
Žemėlapio rodyklės forma yra
Paskutiniai du duomenų tipai, „Tekstas“ ir „IntWritable“, yra reduktoriaus generuojamas išvesties duomenų tipas raktų ir reikšmės poros pavidalu.
Kiekviena reduktoriaus klasė turi būti išplėsta iš MapReduceBase klasės ir ji turi įdiegti Reducer sąsają.
2. „Sumažinti“ funkcijos apibrėžimas
public void reduce( Text t_key,Iteratorvalues,OutputCollector output,Reporter reporter) throws IOException {
Metodo reduc () įvestis yra raktas su kelių verčių sąrašu.
Pavyzdžiui, mūsų atveju bus
Tai suteikiama reduktoriui kaip
Taigi, norint priimti šios formos argumentus, naudojami pirmieji du duomenų tipai, ty tekstas ir iteratorius
Kitas argumentas yra OutputCollector
metodas sumažinti () prasideda nukopijuojant rakto vertę ir inicijuojant dažnio skaičių iki 0.
Teksto raktas = t_key; int frequencyForCountry = 0;
Tada, naudodami kilpą „ while“ , kartojame reikšmių, susijusių su raktu, sąrašą ir apskaičiuojame galutinį dažnį, susumuodami visas reikšmes.
while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}
Dabar mes išstumiame rezultatą į išvesties kolektorių rakto ir gauto dažnio skaičiaus pavidalu .
Žemiau kodas tai daro-
output.collect(key, new IntWritable(frequencyForCountry));
„SalesCountryDriver“ klasės paaiškinimas
Šiame skyriuje suprasime „ SalesCountryDriver“ klasės įgyvendinimą
1. Pirmiausia nurodome paketo pavadinimą savo klasei. „SalesCountry“ yra ne paketo pavadinimas. Atkreipkite dėmesį, kad kompiliacijos išvestis „SalesCountryDriver.class “ pateks į katalogą, pavadintą šiuo paketo pavadinimu: „ SalesCountry“ .
Čia yra eilutė, nurodanti paketo pavadinimą, po kurio kodas importuojamas bibliotekos paketus.
2. Apibrėžkite vairuotojo klasę, kuri sukurs naują kliento darbą, konfigūracijos objektą, ir reklamuosite „Mapper“ ir „Reducer“ klases.
Vairuotojų klasė yra atsakinga už tai, kad mūsų „MapReduce“ darbas būtų rodomas „Hadoop“. Šioje klasėje mes nurodome darbo pavadinimą, įvesties / išvesties duomenų tipą, žemėlapių ir reduktorių klasių pavadinimus .
3. Žemiau esančiame kodo fragmente nustatome įvesties ir išvesties katalogus, kurie naudojami atitinkamai įvedant duomenų rinkinį ir gaminant išvestį.
arg [0] ir arg [1] yra komandinės eilutės argumentai, perduoti naudojant komandą, pateiktą „MapReduce“ rankose, ty
$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales
4. Sukelkite mūsų darbą
Žemiau kodo pradėkite vykdyti „MapReduce“ užduotį
try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}