RDD Sparkia käyttämällä: Apache Sparkin rakennuspalikka

Tämä Sparkia käyttävä RDD-blogi antaa sinulle yksityiskohtaisen ja kattavan tiedon RDD: stä, joka on Sparkin ja kuinka hyödyllinen se on.

, Sana itsessään riittää tuottamaan kipinää jokaisen Hadoop-insinöörin mielessä. TO n muistissa käsittelytyökalu joka on salamannopea klusterilaskennassa. MapReduceen verrattuna muistin sisäisen tiedon jakaminen tekee RDD: t 10-100x nopeammin kuin verkon ja levyn jakaminen, ja kaikki tämä on mahdollista RDD: n (joustavat hajautetut tietojoukot) takia. Avainkohdat, joihin keskitymme tänään tässä RDD: ssä Spark-artikkelissa, ovat:

Tarvitsetko RDD: itä?

Miksi tarvitsemme RDD: tä? -RDD Sparkia käyttämällä





Maailma kehittyy ja Datatiede vuonna etenemisen vuoksi . Algoritmit perustuen Regressio , , ja joka jatkuu Hajautettu Iteratiivinen laskenta ation tavalla, joka sisältää tietojen uudelleenkäytön ja jakamisen useiden laskentayksiköiden kesken.

Perinteinen tekniikat tarvitsivat vakaan välituotteen ja hajautetun tallennustilan HDFS joka käsittää toistuvia laskutoimituksia tietojen replikoinneilla ja tietojen sarjallisuudella, mikä teki prosessista paljon hitaamman. Ratkaisun löytäminen ei ollut koskaan helppoa.



Täällä RDD: t (Resilient Distributed Datasets) tulee kokonaisuuteen.

RDD Niitä on helppo käyttää ja luoda vaivattomasti, kun tietoja tuodaan tietolähteistä ja pudotetaan RDD-tiedostoihin. Lisäksi toimintoja käytetään niiden käsittelyyn. Ne ovat hajautettu muistikokoelma joilla on käyttöoikeudet Lue ainoastaan ja mikä tärkeintä, ne ovat Vikasietoinen .



luoda joukko esineitä java

Jos mitään dataosio / RDD on menetetty , se voidaan regeneroida soveltamalla samaa muutos kadonneen osion toiminta sukutaulu sen sijaan, että käsittelisit kaikkia tietoja tyhjästä. Tällainen lähestymistapa reaaliaikaisissa skenaarioissa voi saada aikaan ihmeitä tietojen menetys tilanteissa tai järjestelmän ollessa poissa käytöstä.

Mitä ovat RDD: t?

RDD tai ( Joustava hajautettu tietojoukko ) on perustavanlaatuinen tietorakenne Sparkissa. Termi Joustava määrittelee kyvyn, joka tuottaa datan automaattisesti tai datan pyöriä takaisin että alkuperäinen tila kun tapahtuu odottamaton onnettomuus tietojen häviämisen todennäköisyydellä.

RDD-tiedostoihin kirjoitetut tiedot ovat osioitu ja tallennetaan useita suoritettavia solmuja . Jos suoritettava solmu epäonnistuu ajon aikana, se saa heti varmuuskopion seuraava suoritettava solmu . Siksi RDD: itä pidetään edistyneenä tietorakenteina verrattuna muihin perinteisiin tietorakenteisiin. RDD: t voivat tallentaa strukturoitua, strukturoimatonta ja osittain strukturoitua tietoa.

Siirrytään eteenpäin RDD: n avulla Spark-blogin avulla ja opitaan RDD: n ainutlaatuisista ominaisuuksista, mikä antaa sille etulyöntiaseman muun tyyppisiin tietorakenteisiin nähden.

RDD: n ominaisuudet

  • Muistissa (RAM) Laskelmat : Muistin sisäisen laskennan käsite vie tietojenkäsittelyn nopeammin ja tehokkaammin vaiheeseen, jossa kokonaisuus esitys järjestelmän päivitetty.
  • L hänen arviointinsa : Termi Laiska arviointi sanoo muunnokset käytetään RDD: n tietoihin, mutta lähtöä ei luoda. Sen sijaan sovelletut muunnokset ovat kirjautunut.
  • Sitkeys : Tuloksena olevat RDD: t ovat aina uudelleenkäytettävä.
  • Karkearakeiset operaatiot : Käyttäjä voi soveltaa muunnoksia kaikkiin tietojoukkojen elementteihin kautta kartta, suodattaa tai ryhmittele toimintaan.
  • Vikasietoinen : Jos tietoja menetetään, järjestelmä voi vieritä takaisin sen alkuperäinen tila käyttämällä kirjautunutta muunnokset .
  • Muuttamattomuus : Määriteltyjä, haettuja tai luotuja tietoja ei voi olla muuttunut kun se on kirjautunut järjestelmään. Jos joudut käyttämään ja muokkaamaan olemassa olevaa RDD: tä, sinun on luotava uusi RDD soveltamalla joukkoa Muutos toimii nykyiselle tai edelliselle RDD: lle.
  • Osiointi : Se on ratkaiseva yksikkö Sparkin rinnakkaisuudesta RDD. Oletuksena luotujen osioiden määrä perustuu tietolähteeseesi. Voit jopa päättää, kuinka monta osiota haluat tehdä mukautettu osio toimintoja.

RDD: n luominen Sparkin avulla

RDD: t voidaan luoda kolmella tavalla:

  1. Luetaan tietoja kohteesta rinnakkaistetut kokoelmat
val PCRDD = kipinä.sparkContext.parallelize (Array ('ma', 'ti', 'ke', 'to', 'pe', 'la'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ). foreach (println)
  1. Hakeminen muutos edellisistä RDD: stä
val sanat = kipinä.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'hyvin', 'voimakas', 'kieli')) val wordpair = sanat.map (w = (w.charAt 0), w)) wordpair.collect (). Foreach (println)
  1. Luetaan tietoja kohteesta ulkoinen tallennustila tai tiedostopolkuja kuten HDFS tai HBase
val Sparkfile = kipinä.read.textFile ('/ käyttäjä / edureka_566977 / kipinä / kipinä.txt.') Sparkfile.collect ()

RDD-levyille suoritetut toiminnot:

RDD-levyille suoritetaan pääasiassa kahden tyyppisiä toimintoja:

  • Muutokset
  • Toiminnot

Muutokset : toimintaan sovellamme RDD: hen suodatin, pääsy ja muuttaa vanhemman RDD: n tiedot a peräkkäinen RDD kutsutaan muutos . Uusi RDD palauttaa osoittimen edelliseen RDD: hen varmistamalla niiden välisen riippuvuuden.

Transformaatiot ovat Laiska arviointi, toisin sanoen käyttämäsi RDD: n toiminnot kirjataan, mutta ei teloitettu. Järjestelmä heittää tuloksen tai poikkeuksen käynnistämisen jälkeen Toiminta .

Voimme jakaa muunnokset kahteen tyyppiin seuraavasti:

  • Kapeat muunnokset
  • Laaja muutos

Kapeat muunnokset Sovellamme kapeita muunnoksia kohtaan a yksi osio vanhemman RDD: n luominen uuden RDD: n luomiseksi, koska RDD: n käsittelemiseen tarvittavat tiedot ovat käytettävissä yhden osion vanhempi ASD . Esimerkkejä kapeista muunnoksista ovat:

  • kartta()
  • suodattaa()
  • tasainen kartta ()
  • osio ()
  • mapPartitions ()

Laajat muutokset: Sovellamme laajaa muunnosta useita osioita luoda uusi RDD. RDD: n käsittelemiseen tarvittavat tiedot ovat käytettävissä vanhempi ASD . Esimerkkejä laajoista muunnoksista ovat:

  • vähennäBy ()
  • liitto()

Toiminnot : Toiminnot ohjaavat Apache Sparkia käyttämään laskenta ja välitä tulos tai poikkeus kuljettajan RDD: lle. Harvaan toimintaan sisältyy:

  • kerätä()
  • Kreivi()
  • ottaa ()
  • ensimmäinen()

Sovelletaan käytännössä toimintoja RDD: llä:

IPL (Intian Premier League) on krikettiturnaus, jonka huippu on huippuluokkaa. Joten, voimme tänään päästä käsiksi IPL-tietojoukkoon ja suorittaa RDD: n käyttämällä Sparkia.

  • Ensinnäkin ladataan IPL: n CSV-ottelutiedot. Lataamisen jälkeen se alkaa näyttää EXCEL-tiedostolta, jossa on rivejä ja sarakkeita.

Seuraavassa vaiheessa käynnistämme kipinän ja lataamme match.csv-tiedoston sijainnistani, minun tapauksessanicsvtiedoston sijainti on “/User/edureka_566977/test/matches.csv”

tulosta_r merkkijonoon

Aloitetaan nyt Muutos osa ensin:

  • kartta():

Käytämme Kartan muunnos soveltaa tiettyä muunnosoperaatiota jokaiselle RDD-elementille. Täällä luomme RDD nimellä CKfile, johon säilytämmecsvtiedosto. Luomme uuden RDD: n, johon kutsutaan valtioita tallentaa kaupungin tiedot .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val-tilat = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • suodattaa():

Suodattimen muunnos, nimi itse kuvaa sen käyttöä. Käytämme tätä muunnosoperaatiota suodattamaan valikoivat tiedot annetusta tietokokoelmasta. Hakemme suodattimen toiminta tästä saat vuoden IPL-otteluiden tietueet 2017 ja tallenna se tiedostoon RDD.

val fil = CKfile.filter (rivi => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Sovellamme flatMap on muunnosoperaatio jokaiselle RDD: n elementille uuden RDD: n luomiseksi. Se on samanlainen kuin kartan muunnos. täällä me sovellammeLitteä karttaettä sylkeä Hyderabadin ottelut ja tallenna tiedotfilRDDRDD.

val filRDD = fil.flatMap (rivi => line.split ('Hyderabad')). kerää ()

  • osio ():

Jokainen data, jonka kirjoitamme RDD: ksi, jaetaan tiettyyn määrään osioita. Käytämme tätä muutosta löytääksemme osioiden määrä tiedot jaetaan tosiasiallisesti.

val fil = CKfile.filter (rivi => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Pidämme MapPatitionsia vaihtoehtona Map (): lle jajokaiselle() yhdessä. Käytämme mapPartitionsia täältä löytääksesi rivien määrä meillä on tiedostossamme RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • vähentääBy ():

KäytämmeReduceBy() päällä Avainarvoparit . Käytimme tätä muutosta omassacsvtiedosto soittimen löytämiseksi otteluiden korkein mies .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (väärä) ManOTH.take (10) .foreach (println)

  • liitto():

Nimi selittää kaiken, käytämme ammattiliittojen muutosta klubi kaksi RDD: tä yhdessä . Tässä luomme kaksi RDD: tä, nimittäin fil ja fil2. fil RDD sisältää vuoden 2017 IPL-otteluiden tietueet ja fil2 RDD sisältää vuoden 2016 IPL-ottelutietueen.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Aloitetaan Toiminta osa, jossa näytämme todellisen tuotoksen:

  • kerätä():

Kerää on toiminta, jota käytämme näyttää sisällön RDD: ssä.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • Kreivi():

Kreivion toiminto, jota käytämme laskemaan tietueiden lukumäärä läsnä RDD: ssä.Tässäkäytämme tätä toimintoa laskeaksesi tietueiden kokonaismäärän match.csv-tiedostossamme.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • ota ():

Take on keräilyä muistuttava toiminto, mutta ainoa ero on se, että se voi tulostaa minkä tahansa valikoiva rivien lukumäärä käyttäjän pyynnön mukaan. Tässä käytämme seuraavaa koodia tulostamaan kymmenen johtavaa raporttia.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. ota (10). foreach (println)

  • ensimmäinen():

First () on toiminto, joka on samanlainen kuin kerätä () ja ottaa ()sekäytetään tulostamaan ylimmän raportin s ulostulo Tässä käytämme ensimmäistä () operaatiota tietyssä kaupungissa pelattujen otteluiden enimmäismäärä ja saamme Mumbain tuotokseksi.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val valtiot = CKfile.map (_. split (',') (2)) val Scount = tilat.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Tehdäkseen prosessistamme RDD-oppimisen Sparkin avulla vieläkin mielenkiintoisemman, olen keksinyt mielenkiintoisen käyttötapauksen.

RDD Spark: Pokemon -tapauksessa

  • Ensinnäkin Lataa meidän Pokemon.csv-tiedosto ja lataa se kipinän kuoreen kuten teimme Matches.csv-tiedostoon.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemoneja on tosiasiallisesti saatavana monenlaisina. Löytäkäämme muutama lajike.

  • Kaavan poistaminen Pokemon.csv-tiedostosta

Emme ehkä tarvitse sitä Kaavio Pokemon.csv-tiedostosta. Siksi poistamme sen.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Niiden määrän löytäminen osiot pokemon.csv-tiedostomme on jaettu.
println ('Osionumero =' + NoHeader.partitions.size)

  • Vesi Pokemon

Löytäminen vesipokemonien määrä

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Tuli Pokemon

Löytäminen Fire-pokemonien määrä

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Voimme myös havaita väestö erityyppisestä pokemonista käyttämällä laskutoimintoa
WaterRDD.count () FireRDD.count ()

  • Koska pidän pelistä puolustava strategia Anna meidän löytää pokemon kanssa maksimaalinen puolustus.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Korkein_Defence:' + defenceList.max ())

  • Tiedämme enimmäismäärän puolustusvoiman arvo mutta emme tiedä mikä pokemon se on. Joten, löydetään mikä on se pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Tilaus [Tupla] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Tehkäämme nyt pokemon kanssa vähiten puolustus
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

siirtää tiedostoja ec2 Windows -esiintymään
  • Katsokaamme nyt Pokemonia a vähemmän puolustava strategia.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokemonNimi .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Tilaus [Tupla ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Joten tämän avulla olemme päättäneet tämän RDD: n käyttämällä Spark-artikkelia. Toivon, että sytytimme hieman valoa tietoonne RDD: stä, niiden ominaisuuksista ja erityyppisistä toiminnoista, joita heille voidaan suorittaa.

Tämä artikkeli perustuu on suunniteltu valmistautumaan Cloudera Hadoop- ja Spark Developer -sertifiointikokeeseen (CCA175). Saat syvällistä tietoa Apache Sparkista ja Spark-ekosysteemistä, johon kuuluvat Spark RDD, Spark SQL, Spark MLlib ja Spark Streaming. Saat kattavaa tietoa Scala-ohjelmointikielestä, HDFS: stä, Sqoopista, Flumeista, Spark GraphX: stä ja Messaging-järjestelmästä, kuten Kafka.