Kumulatiivinen tilanmuutos Apache Spark -suoratoistossa



Tässä blogiviestissä käsitellään Spark Streamingin tilanmuutoksia. Opi kaikki kumulatiivisesta seurannasta ja taitoja Hadoop Spark -uralle.

Lähettäjä Prithviraj Bose

Aikaisemmassa blogissani olen keskustellut tilallisista muutoksista käyttämällä Apache Spark Streaming -kuvaketta. Voit lukea sen tässä .



Tässä viestissä aion keskustella kumulatiivisista tilallisista toiminnoista Apache Spark Streaming -sovelluksessa. Jos olet uusi Spark Streaming -ohjelmassa, suosittelen vahvasti, että luet edellisen blogini ymmärtääksesi kuinka ikkuna toimii.

Tilaillisen muutoksen tyypit kipinän suoratoistossa (jatkuu ...)

> Kumulatiivinen seuranta

Olimme käyttäneet reducByKeyAndWindow (…) Sovellusliittymä avainten tilojen seuraamiseen, mutta ikkuna asettaa rajoituksia tietyille käyttötapauksille. Entä jos haluamme kerätä avainten tilan läpi sen sijaan, että rajoittaisimme sen aikaikkunaan? Siinä tapauksessa meidän olisi käytettävä updateStateByKey (…) ANTAA POTKUT.



Tämä sovellusliittymä otettiin käyttöön Spark 1.3.0: ssa ja se on ollut erittäin suosittu. Tällä API: lla on kuitenkin jonkin verran suorituskykyä, sen suorituskyky heikkenee tilojen koon kasvaessa ajan myötä. Olen kirjoittanut näytteen tämän API: n käytöstä. Löydät koodin tässä .

Spark 1.6.0 esitteli uuden sovellusliittymän mapWithState (…) joka ratkaisee esittämät suorituskyvyn yleiskustannukset updateStateByKey (…) . Tässä blogissa aion keskustella tästä nimenomaisesta sovellusliittymästä käyttämällä kirjoittamaani esimerkkiohjelmaa. Löydät koodin tässä .

Ennen kuin sukellan koodikävelyyn, säästetään muutama sana tarkistuspisteestä. Kaikissa tilallisissa muunnoksissa tarkistuspiste on pakollinen. Tarkistuskohdistus on mekanismi, jolla palautetaan avainten tila, jos ohjainohjelma epäonnistuu. Kun ohjain käynnistyy uudelleen, avainten tila palautetaan tarkistuspisteistä. Tarkistuspisteiden sijainnit ovat yleensä HDFS tai Amazon S3 tai mikä tahansa luotettava tallennustila. Koodia testattaessa voidaan tallentaa myös paikalliseen tiedostojärjestelmään.



kuinka määrittää eclipse Java - sovellukselle

Kuuntelemme esimerkkiohjelmassa socket-tekstivirtaa isännässä = localhost ja port = 9999. Se merkitsee saapuvan virran (sanat, esiintymien määrä) ja seuraa sanamäärää 1.6.0-sovellusliittymän avulla mapWithState (…) . Lisäksi avaimet, joilla ei ole päivityksiä, poistetaan StateSpec. Aikakatkaisu API. Tarkistamme HDFS: ssä ja tarkistuspisteiden taajuus on 20 sekunnin välein.

Luodaan ensin Spark Streaming -istunto,

Spark-streaming-session

Luomme tarkistuspiste HDFS: ssä ja kutsu sitten objektimenetelmää getOrCreate (…) . getOrCreate API tarkistaa tarkistuspiste nähdäksesi onko palautettavia aikaisempia tiloja, jos sellaisia ​​on, se luo Spark Streaming -istunnon uudelleen ja päivittää avainten tilat tiedostoihin tallennetuista tiedoista, ennen kuin jatkat uusien tietojen kanssa. Muuten se luo uuden Spark Streaming -istunnon.

getOrCreate ottaa tarkistuspisteen hakemiston nimen ja funktion (jonka olemme nimenneet createFunc ) jonka allekirjoituksen tulisi olla () => StreamingContext .

sql-palvelimen päivämäärätietotyyppi

Tarkastellaan sisällä olevaa koodia createFunc .

Rivi # 2: Luomme suoratoistokontekstin työn nimellä 'TestMapWithStateJob' ja eräajo = 5 sekuntia.

Rivi # 5: Aseta tarkistuspisteen hakemisto.

Rivi # 8: Aseta tilamääritys luokan avulla org.apache.streaming.StateSpec esine. Asetetaan ensin toiminto, joka seuraa tilaa, ja sitten asetamme seuraaville muunnoksille syntyvien DStreamien osioiden lukumäärän. Lopuksi asetamme aikakatkaisun (30 sekuntiin), jossa jos avaimen päivitystä ei saada 30 sekunnissa, avaimen tila poistetaan.

Rivi 12 #: Asenna pistorasian virta, tasoita saapuvat erätiedot, luo avain-arvo-pari, soita mapWithState , aseta tarkistusväli 20 sekunniksi ja tulosta lopuksi tulokset.

Spark-kehys kutsuu th e createFunc jokaiselle avaimelle, jolla on edellinen arvo ja nykyinen tila. Laskemme summan ja päivitämme tilan kumulatiivisella summalla ja lopuksi palautamme avaimen summan.

ohita viittaus java

Github-lähteet -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Onko sinulla kysymys meille? Mainitse se kommenttiosassa ja palaamme sinuun.

Aiheeseen liittyvät julkaisut:

Aloita Apache Spark & ​​Scalan käyttö

Tilanmuutokset ja peräkkäin kipinän suoratoistossa