DBInputFormat tietojen siirtämiseksi SQL: stä NoSQL-tietokantaan



Tämän blogin tarkoituksena on oppia siirtämään tietoja SQL-tietokannoista HDFS: ään, kuinka tietoja voidaan siirtää SQL-tietokannoista NoSQL-tietokantoihin.

Tässä blogissa tutkitaan Hadoop-tekniikan yhden tärkeimmän komponentin eli MapReducen ominaisuuksia.

Tänään yritykset ottavat Hadoop-kehyksen ensimmäiseksi valinnakseen tallennustilan, koska se kykenee käsittelemään suuria tietoja tehokkaasti. Mutta tiedämme myös, että data on monipuolista ja että sitä on eri rakenteissa ja muodoissa. Tällaisen valtavan määrän tietojen ja sen erilaisten muotojen hallitsemiseksi on oltava mekanismi, joka mahtuu kaikkiin lajikkeisiin ja tuottaa kuitenkin tehokkaan ja yhdenmukaisen tuloksen.





Hadoop-kehyksen tehokkain komponentti on MapReduce, joka voi hallita tietoja ja niiden rakennetta paremmin kuin muut vastaavat. Vaikka se vaatii oppimiskäyrän ja ohjelmoinnin monimutkaisuuden, voit hallita kaikenlaista dataa Hadoopin avulla, jos pystyt käsittelemään nämä monimutkaisuudet.

MapReduce Framework jakaa kaikki prosessointitehtävät kahteen vaiheeseen: Map ja Reduce.



Raakatietojen valmistelu näitä vaiheita varten edellyttää joidenkin perusluokkien ja rajapintojen ymmärtämistä. Näiden jälleenkäsittelyjen superluokka on InputFormat.

InputFormat luokka on yksi Hadoop MapReduce -sovellusliittymän ydinluokista. Tämä luokka on vastuussa kahden pääasiasta:

  • Tiedot jaetaan
  • Levynlukija

Tietojen jakaminen on Hadoop MapReduce -kehyksen peruskäsite, joka määrittelee sekä yksittäisten karttatehtävien koon että sen mahdollisen suorituspalvelimen. Levynlukija on vastuussa todellisten tietueiden lukemisesta syötetiedostosta ja niiden lähettämisestä (avain / arvo-pareittain) kartoittajalle.



Kartoittajien lukumäärä päätetään jakojen määrän perusteella. InputFormatin tehtävä on luoda halkeamat. Suurin osa ajanjakokokoista vastaa lohkon kokoa, mutta ei aina, että jakoja luodaan HDFS-lohkokoon perusteella. Se riippuu täysin siitä, miten InputFormatin getSplits () -menetelmä on ohitettu.

MR-jaon ja HDFS-lohkon välillä on perustavanlaatuinen ero. Lohko on fyysinen tiedonkappale, kun taas jako on vain looginen pala, jonka kartoitin lukee. Jako ei sisällä syötetietoja, sillä on vain tietojen viite tai osoite. Jaossa on periaatteessa kaksi asiaa: Pituus tavuina ja joukko tallennuspaikkoja, jotka ovat vain merkkijonoja.

Ymmärrämme tämän paremmin ottamalla yhden esimerkin: MySQL: ään tallennettujen tietojen käsittely MR: n avulla. Koska tässä tapauksessa ei ole olemassa lohkojen käsitettä, teoria: 'splitit luodaan aina HDFS-lohkon perusteella',epäonnistuu. Yksi mahdollisuus on luoda jakoja MySQL-taulukon rivialueiden perusteella (ja tämän tekee DBInputFormat, syöttömuoto relaatiotietokantojen tietojen lukemiseen). Meillä voi olla k lukumäärää, jotka koostuvat n rivistä.

Jaot luodaan vain FileInputFormat-pohjaisiin InputFormat-tiedostoihin (InputFormat tiedostoihin tallennettujen tietojen käsittelyyn) perustuen syötetiedostojen kokoon tavuina. Syötetiedostojen FileSystem-lohkokokoa pidetään kuitenkin tulohalkojen ylärajana. Jos sinulla on tiedosto, joka on pienempi kuin HDFS-lohkokoko, saat tälle tiedostolle vain yhden kartoittajan. Jos haluat käyttäytyä eri tavalla, voit käyttää mapred.min.split.size. Mutta se riippuu jälleen vain InputFormatin getSplitsistä ().

Meillä on niin monta olemassa olevaa syöttömuotoa, jotka ovat käytettävissä paketissa org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

java miten lopettaa ohjelma

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

luo parametri taulukkoon

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Oletusarvo on TextInputFormat.

Vastaavasti meillä on niin monta tulostusmuotoa, joka lukee pienentäjien tiedot ja tallentaa ne HDFS: ään:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Oletusarvo on TextOutputFormat.

Kun olet lukenut tämän blogin, olisit oppinut:

  • Kuinka kirjoittaa kartan vähennysohjelma
  • Tietoja Mapreducessa saatavilla olevista erityyppisistä InputFormateista
  • Mikä on InputFormatsin tarve
  • Kuinka kirjoittaa mukautettuja InputFormats-tiedostoja
  • Kuinka siirtää tietoja SQL-tietokannoista HDFS: ään
  • Kuinka siirtää tietoja SQL (tässä MySQL) -tietokannoista NoSQL-tietokantoihin (tässä Hbase)
  • Kuinka siirtää tietoja yhdestä SQL-tietokannasta toiseen SQL-tietokantojen taulukkoon (Ehkä tämä ei välttämättä ole niin tärkeää, jos teemme tämän samassa SQL-tietokannassa. Ei kuitenkaan ole mitään vikaa siitä, että meillä on tieto samasta. Et voi koskaan tietää miten se voi tulla käyttöön)

Edellytys:

  • Hadoop esiasennettu
  • SQL on esiasennettu
  • Hbase esiasennettu
  • Java-perustiedot
  • MapReduce tietoa
  • Hadoop-kehyksen perustiedot

Ymmärretään ongelman ratkaisu, jonka aiomme ratkaista täällä:

Meillä on työntekijätaulukko MySQL DB: ssä relaatiotietokannassamme Edureka. Yritysvaatimuksen mukaan meidän on siirrettävä kaikki relaatiotietokannassa käytettävissä olevat tiedot Hadoop-tiedostojärjestelmään, eli HDFS, NoSQL DB, jotka tunnetaan nimellä Hbase.

Meillä on monia vaihtoehtoja tämän tehtävän suorittamiseen:

  • Sqoop
  • Flume
  • MapReduce

Nyt et halua asentaa ja määrittää muita työkaluja tätä toimintoa varten. Sinulla on vain yksi vaihtoehto, joka on Hadoopin prosessointikehys MapReduce. MapReduce-kehys antaisi sinulle täyden hallinnan tietojen siirron aikana. Voit manipuloida sarakkeita ja sijoittaa ne suoraan mihin tahansa kahdesta kohdesijainnista.

Huomautus:

  • Meidän on ladattava ja asetettava MySQL-liitin Hadoopin luokkatielle hakemaan taulukoita MySQL-taulukoista. Tätä varten lataa liitin com.mysql.jdbc_5.1.5.jar ja pidä se hakemistossa Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Lataukset / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Laita myös kaikki Hbase-purkit Hadoop-luokkaradan alle, jotta MR-ohjelmasi voi käyttää Hbase-järjestelmää. Suorita tämä suorittamalla seuraava komento :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Ohjelmistoversiot, joita olen käyttänyt tämän tehtävän suorittamisessa, ovat:

  • Hadooop-2.3.0
  • HBase 0,98,9-Hadoop2
  • Pimennys Kuu

Ohjelman välttämiseksi missään yhteensopivuusongelmassa määrään lukijani suorittamaan komennon samanlaisessa ympäristössä.

Mukautettu DBInputWritable:

paketti com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implementates Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) heittää IOException {} public void readFields (ResultSet rs) throws SQLException // Tulosjoukko-objekti edustaa SQL-käskystä palautettuja tietoja {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) heittää IOException { } public void write (PreparedStatement ps) heittää SQLException {ps.setInt (1, id) ps.setString (2, nimi) ps.setString (3, osasto)} public int getId () {return id} public String getName () {return name} julkinen merkkijono getDept () {return dept}}

Mukautettu DBOutputWritable:

paketti com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implementates Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = nimi this.id = id this.dept = dept} public void readFields (DataInput in) heittää IOException {} public void readFields (ResultSet rs) heittää SQLException {} public void write (DataOutput out) heittää IOException {} public void write (PreparedStatement ps) heittää SQLException {ps.setString (1, nimi) ps.setInt (2, id) ps.setString (3, osasto)}}

Syöttötaulukko:

luoda tietokanta edureka
luo taulukon emp (empid int not null, nimi varchar (30), dept varchar (20), ensisijainen avain (empid))
lisää emp-arvoihin (1, 'abhay', 'kehitys'), (2, 'brundesh', 'testi')
valitse * emp

Tapaus 1: Siirto MySQL: stä HDFS: ään

paketti com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Työn tuonti org.apache.hadoop.mapreduce.lib.db.DBConfiguration tuonti org.apache.hadoop.mapreduce.lib.db.DBInputFormat tuonti org.apache.hadoop.mapreduce.lib.output.FileOutputFormat tuonti org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) heittää poikkeuksen {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // ajuriluokka' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // käyttäjänimi' root ') // salasana Job job = uusi Job (conf) työ .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Teksti.luokka) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormet.utomateriaali uusi polku (args [0])) DBInputFormat.setInput (työ, DBInputWritable.class, 'emp', // syötetaulukon nimi null, null, uusi merkkijono [] {'empid', 'name', 'dept'} / / taulukon sarakkeet) Polku p = uusi polku (args [0]) FileSystem fs = FileSystem.get (uusi URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Tämän koodinpalan avulla voimme valmistella tai määrittää syötemuodon pääsemään lähde-SQL-tietokantaan. Parametri sisältää ohjainluokan, URL-osoitteessa on SQL-tietokannan osoite, sen käyttäjänimi ja salasana.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // ajuriluokka 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // käyttäjänimi 'root') //Salasana

Tämän koodinpalan avulla voimme välittää tietokannan taulukkojen yksityiskohdat ja asettaa ne työobjektiin. Parametrit sisältävät tietysti työilmentymän, mukautettavan kirjoitettavan luokan, jonka on toteutettava DBWritable-käyttöliittymä, lähdetaulukon nimen, ehdon mahdollisen nollan, minkä tahansa lajitteluparametrin muu nolla, taulukon sarakkeiden luettelon vastaavasti.

DBInputFormat.setInput (työ, DBInputWritable.class, 'emp', // syötetaulukon nimi null, null, uusi merkkijono [] {'empid', 'name', 'dept'} // taulukon sarakkeet)

Mapper

paketti com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map laajentaa Mapperia {
suojattu mitätöity kartta (LongWritable-avain, DBInputWritable-arvo, konteksti ctx) {kokeile {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (uusi teksti (nimi + '' + id + '' + osasto), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reducer: Identity Reducer käytetty

Komento suoritettavaksi:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Tulos: MySQL-taulukko siirretty HDFS: ään

hadoop dfs -ls / dbtohdfs / *

Tapaus 2: Siirto MySQL: n yhdestä taulukosta toiseen MySQL: ssä

luodaan taulukko MySQL: ssä

luo taulukon työntekijä1 (nimi varchar (20), id int, dept varchar (20))

paketti com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) heittää poikkeuksen {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // käyttäjänimi' root ') // salasana Job job = uusi Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) työ .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (työ, DBInputWritable.class, 'emp', // syötetaulukon nimi null, null, uusi merkkijono [] {'empid ',' nimi ',' osasto '} // taulukon sarakkeet) DBOutputFormat.setOutput (työ,' työntekijä1 ', // tulostetaulukon nimi uusi merkkijono [] {' nimi ',' id ',' osasto '} // taulukko sarakkeet) System.exit (job.waitForCompletion (true)? 0: 1)}}

Tämän koodinpalan avulla voimme määrittää lähtötaulukon nimen SQL DB: ssä.Parametrit ovat vastaavasti työilmentymä, lähtötaulukon nimi ja ulostulosarakkeen nimet.

DBOutputFormat.setOutput (työ, 'työntekijä1', // lähtötaulukon nimi uusi merkkijono [] {'nimi', 'id', 'osasto'} // taulukon sarakkeet)

Mapper: Sama kuin tapaus 1

Reduktori:

paketti com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce laajentaa Reducer {suojattu void reduc (tekstiavain, Iterable-arvot, Context ctx) {int summa = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (rivi [0] .toString (), Integer.parseInt (rivi [1] .toString ()), rivi [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Komento suoritettavaksi:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Tulos: Siirretty data MySQL: n EMP-taulukosta toiselle taulukon työntekijälle1 MySQL: ssä

Tapaus 3: Siirto MySQL-taulukosta NoSQL (Hbase) -taulukkoon

Hbase-taulukon luominen SQL-taulukon lähdön mukauttamiseksi:

luo 'työntekijä', 'virallinen_info'

Kuljettajan luokka:

paketti Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat Tuo org.apache.hadoop.hbase.mapreduce.TableOutputFormat Tuo org.apache.hadoop.hbase.HBaseConfiguration Tuo org.apache.hadoop.hbase.client.HTable tuoda org.apache.hadoop.hbase.client.HTableInterface Tuo org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) heittää poikkeuksen {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // käyttäjänimi 'root') // salasana Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('työntekijä', Pienennä.luokka, työ) job.setInputFormFormatClassFassFormFormATClassFormFormATClassFormFormATClassFormFormATClassFormFormatClassicMaterialTyöntöMuuttuja (työmatto). luokka) DBInputFormat.setInput (työ, DBInputWritable.class, 'emp', // syötetaulukon nimi null, null, uusi merkkijono [] {'empid', 'nimi', 'osasto'} // taulukon sarakkeet) System.exit (job.waitForCompletion (tosi)? 0: 1)}}

Tämän koodinpalan avulla voit määrittää lähtöavaimen luokan, joka hbase-tapauksessa on ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Tässä välitämme hbase-taulukon nimen ja pelkistimen toimimaan pöydällä.

TableMapReduceUtil.initTableReducerJob ('työntekijä', Vähennä.luokka, työ)

Mapper:

paketti Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map laajentaa Mapperia {private IntWritable one = uusi IntWritable (1) suojattu tyhjä kartta (LongWritable id, DBInputWritable value, Context context) {kokeile {Merkkijono = arvo.getName () Merkkijono cd = arvo.getId () + '' String osasto = arvo.getDept () context.write (uusi ImmutableBytesWritable (tavua.toBytes (cd)), uusi teksti (rivi + ') '+ osasto))} saalis (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Tässä koodinpätkässä otamme arvot DBinputwritable-luokan gettereistä ja välitämme ne sitten
ImmutableBytesWritable niin, että ne saavuttavat vähennysventtiilin tavutekniikkamuodossa, jonka Hbase ymmärtää.

Merkkijono = arvo.getName () Merkkijono cd = arvo.getId () + '' Merkkijonoosasto = arvo.getDept () context.write (uusi ImmutableBytesWritable (tavua.toBytes (cd)), uusi teksti (rivi + '' + osasto ))

Reduktori:

paketti Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes tuo org.apache.hadoop.io.Text public class Reduce laajentaa TableReducer {public void reduc (ImmutableBytesWritable key, Iterable values, Context context) heittää IOException, InterruptedException {String [] syy = null // Loop-arvot for (Teksti val: arvot) {cause = val.toString (). split ('')} // Put to HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Tavua - tavua ('nimi'), tavua - tavua (aiheuttaa [0])) laittaa lisää (tavua - tavua (virallinen_info), tavua - tavua (osasto), tavua - tavu [aiheuttaa] ])) context.write (avain, laita)}}

Tämän koodinpalan avulla voimme päättää tarkan rivin ja sarakkeen, johon haluaisimme tallentaa arvot pelkistimestä. Täällä me tallennamme jokaisen empidin erilliselle riville, kun teimme empidin rivinäavaimeksi, joka olisi ainutlaatuinen. Kullekin riville tallennamme työntekijöiden viralliset tiedot sarakkeeseen ”virallinen_info” sarakkeisiin ”nimi” ja ”osasto”.

Put put = new Put (key.get ()) put.add (tavua.toBytes ('virallinen_info'), tavua.toBytes ('nimi'), tavua.toBytes (aiheuttaa [0])) put.add (tavua. toBytes ('virallinen_info'), Bytes.toBytes ('osasto'), Bytes.toBytes (syy [1])) context.write (avain, put)

Siirretyt tiedot Hbase:

skannaa työntekijä

Kuten näemme, pystyimme suorittamaan liiketoimintatietojemme siirtämisen relaatio-SQL-tietokannasta NoSQL-tietokantaan onnistuneesti.

c ++ hyppää riville

Seuraavassa blogissa opitaan kirjoittamaan ja suorittamaan koodeja muille tulo- ja lähtömuodoille.

Jatka kommenttien, kysymysten tai palautteen lähettämistä. Haluaisin kuulla sinusta.

Onko sinulla kysymys meille? Mainitse se kommenttiosassa ja palaamme sinuun.

Aiheeseen liittyvät julkaisut: