Pri crawlovaní veľkého veľkého množstva stránok(desiatky miliónov až niekoľko miliárd) metódou pavúka sa často stáva, že sa do webového archívu dostanú niektoré informácie/články duplicitne. Pre ďalšie spracovanie dát nám stačí mať každú informáciu práve raz, aby sme ju nemuseli spracovávať niekoľko krát.
Zvyčajne to bývajú stránky:
Zo všetkých stránok si do ďalšieho spracovania chceme ponechať len to podstatné/unikátne.
Internet predstavuje enormné množstvo dát. Pre každú jednu stránku a jej časť je potrebné previesť overenie, či sa už daný článok/časť nachádza v našej databáze.
Deduplikácia je preto prevádzaná ako distribuovaný výpočet. Jednotlivé procesy sa delia na "server" a "worker".
Predstavuje databázu známych článkov/častí, ktoré sú ukladané v podobe hashu (20 miestne celé číslo). Ak je serverov viac, každý má na starosť len hashe v čiastkovom rozsahu. Jeho úlohou je spracovať požiadavku na overenie existencie daného hashu v databáze, ak už existuje, označí ho ako duplicitný. Ak je nový, označí ho ako unikátny a uloží do databázy. Databáza môže byť pri štarte servera načítaná zo súboru a pri jeho korektnom ukončení uložená.
Spracováva jednotlivé časti commoncrawlu vo vertikalizovanej(1 slovo na riadok, zbavené HTML) podobe. Ako prvú načíta hlavičku stránky. Overí, či sú URL s titulkom unikátne. Ak nie, spočíta hash celého článku. V prípade, že je celý článok duplicitný, rovno ho zahodí. Ak duplicitný nie je, prechádza ku deduplikácii obsahu. Text je delený na paragrafy. Paragrafy sú delené na krátky paragraf(kratší ako konštanta - zvyčajne 50 znakov) a dlhý paragraf. Krátky paragraf môže byť napríklad nadpis, alebo krátky popis. Ten môže byť vo veľa článkoch rovnaký a keďže nechceme z celého internetu odstrániť nadpis napr. "Fotogaléria", nebudeme ho deduplikovať a ponecháme si ho v bufferi. Ak narazíme na dlhý paragraf textu spočítame jeho hash a obrátime sa na odpovedajúci server. V prípade že je unikátny, ponecháme si ho. Ak nie, bude zahodený.
Týmto spôsobom každý worker postupne prejde všetkými súbormi vo vstupnej zložke. Výsledkom je jeden súbor *.dedup
ku každému vstupnému súboru, ktorý je zbavený duplicít vzhľadom na spracovávané dáta.
Všetky programy pre deduplikáciu sú v repozitári KNOTu vo vetve corpproc_dedup.
K deduplikci slouží programy dedup a server dostupné přeložením pomocí Makefilu ve složce:
[git::corpproc_dedup]/processing_steps/3/dedup/
Parametry:
./server -m=~/hashmap.conf [-i=INPUT_FILE] [-o=OUTPUT_FILE [-d]] [-s=STRUCT_SIZE] [-p=PORT] [-j=JOURNAL_FILE] [-k] [a] [-P=RPORT]
-i --input
- vstupní soubor -o --output
- výstupní soubor-h --help
- vypíše zoznam spúšťacích argumentov a ich použitie-p --port
- port serveru (výchozí je 1234)-s --size
- změna velikosti struktury pro uložení hashů (výchozí velikost je 300,000,000)-d --debug
- zároveň s výstupním souborem je generován soubor s debug výpisy-j --journal
- obnovenie hashov zo zálohy (použiť v kombinácii s -j u klienta)-k --keep
- ponechanie záložného súboru aj po úspešnom uložení hashov do výstupného súboru-m --map
- serializovaná mapa s rozložením hashov na serveroch-a --altered
- skontroluje mapu(--map) a odošle cudzie bloky novým vlastníkom. Nutné pri zmene mapy!-p --rport
- port služby pre migráciu hashov (predvolený je -p + 1)Server běží do doby než je "killnut". Reakcí na signály SIGHUP a SIGTERM je uložení hashů do výstupního souboru (pokud je zadána cesta). Signál SIGPIPE je ignorovaný. Ak je zadaná cesta pre uloženie hashov do súboru, server generuje žurnálový súbor. V ňom je zachytený každý prijatý hash pre prípad chyby, alebo pádu servera skôr, než by hashe boli uložené do výstupného súboru.
Pre obnovenie hashov zo súboru je potrebné spustiť server s parametrom -j=~/server_hash.backup
. Hashe budú zo žurnálu načítané spoločne so vstupným súborom (ak bol zadaný) a v prípade zadania výstupného súboru serializované ako výstupné dáta.
Po úspešnom ukončení servera je žurnálový súbor (*.backup) odstránený. Napr.
./server -m=~/hashmap.conf -i=~/server_hash -o=~/server_hash -j=server_hash.backup
Launch paramters:
./dedup -i=INPUT_DIR -o=OUTPUT_DIR -s=SERVERS_STRING [-p=PORT] [-t=THREADS] [-n] [-d] [-wl] [-uf=FILTER_FILE]
-i --input
- vstupní adresář se soubory určenými k deduplikaci-o --output
- výstupní adresář, pokud adresář neexistuje, skript se jej pokusí vytvořit-p --port
- port serveru (výchozí je 1234)-t --threads
- nastavuje počet vláken (výchozí je 6)-n --near
- použije algoritmus "nearDedup"-d --debug
- ke každému výstupnímu souboru .dedup je zároveň generován soubor .dedup.debug obsahující debug výpisy-wl --wikilinks
- deduplikace formátu WikiLinks-dr --dropped
- ke každému výstupnímu souboru .dedup je generován soubor .dedup.dropped obsahující odstraněné duplicity-f --feedback
- záznamy v souboru .dedup.dropped obsahující referenci na záznamy kvůli kterým byly vyloučeny (viz. níže) -dd --droppeddoc
- ku každému výstupnému súboru *.dedup je vytvorený súbor .dedup.dd obsahujúci zoznam URL adries kompletne vylúčených dokumentov-j --journal
- pokračovanie v deduplikácii po páde - spracované dokumenty budú preskočené, rozpracované budú dokončené (používa sa v kombinácii s -j u servera)-h --help
- vypíše zoznam vstupných argumentov-m --map
- konfiguračná mapa rozloženia hashovPro jejich jednodušší spouštění byly vytvořeny skripty server.py
a deduplicate.py
(popsány níže), které umožnují spustit deduplikaci paralelně na více strojích. Programy musí být předem rozdistribuovány na všechny použité stroje a na nich být ve stejném umístění například: /mnt/data/bin/dedup
Spektrum hashov je rozdelené do blokov (zvyčajne 1999 - počet je nutné určiť pred prvým spracovaním, potom nie je možné ho zmeniť).
Bloky sú pred prvým spustením rovnomerne rozdelené medzi servery pomocou programu distribute, ktorý vygeneruje konfiguračný súbor.
Program distribute sa zostavuje spoločne s workerom a serverom.
Použití:
./distribute [-s=SERVER_FILE] [-i=INPUT_FILE] [-o=OUTPUT_FILE][-b=BLOCK_COUNT] [-d] [-q] [-f] [-E=EXC_FILE]
-s --servers
- Zoznam hashholderov, 1 hostname na riadok (default:"./servers.txt")-o --output
- Zapíše konfiguráciu do výstupného súboru, v opačnom prípade bude vypísaná na STDOUT-b --blocks
- Počet blokov, do ktorých sa rozdelí spektrum hashu (default:1999)-d --debug
- Zapne ladiace výpisy - počet blokov na každom serveri-q --quick
- Vypne optimalizáciu rozloženia blokov. Ušetrí pár prenesených blokov. - v mnohých prípadoch môže zhoršiť rovnomerné rozdelenie a tým aj rýchlosť deduplikácie-f --full
- Hashe budú prerozdelené znova. Pri rozdelení ignoruje starú konfiguráciu. - vhodné pre vypísanie štatistiky a porovnanie koľko blokov (% blokov) bude presunutých-E --excluded
- Uloží do súboru zoznam vylúčených serverov. Zoznam sa použije pri spustení deduplikácie aby boli hashe z vylúčených serverov správne presunuté.Bloky sú na serveroch rozdelené podľa zvyškov po delení počtom serverov.
Pro 10 serverov:
blok 0, 10, 20, ... bude uložený na serveri s indexom 0. blok 1, 11, 21, ... bude uložený na serveri s indexom 1. ... blok 9, 19, 29, ... bude uložený na serveri s indexom 9.
Pri počte blokov 100 (<0,99>) bude každý server držať 10 blokov. Takéto rozdelenie nám umožňuje jednoducho pridať ďalší server. Nový server má index 10, teda priradíme mu každý blok, ktorého zvyšok po delení číslom 11(aktuálny počet serverov) je 10.
Sú to bloky: 10, 21, 32, 43, 54, 65, 76, 87 a 98
Všimnite si, že každému serveru (okrem serveru 9, lebo 11 nie je deliteľ čísla 100) sme ubrali jeden blok.
Nové rozloženie: servery 0..8 a 10 držia 9 blokov, server 9 drží 10 blokov (10 * 9 + 1 * 10 = 100)
Tu však prichádza problém, ak by sme chceli pridať nový server s indexom 11
Jeho bloky by boli: 11, 23, 35, 47, 59, 71, 83, 95
To by znamenalo, že serverom 1, 3, 5, 7 a 9 uberieme dva bloky a zvyšným nič. Počet blokov by síce sedel, ale rozloženie by nebolo rovnomerné.
Pri odobratí servera sa voľné bloky rovnomerne rozložia medzi ostatné servery. Rozhádže to však rozloženie podľa zvyškov po delení a pri ďalšom pridávaní servera by to bez použitia optimalizácie mohlo byť dosť nerovnomerné.
Aby sme predišli problému z predchádzajúcej kapitoly, ktorý pri viacnásobnej aplikácií môže spôsobiť aj viac ako dvojnásobný rozdiel v počte blokov medzi jednotlivými servermi, rozloženie optimalizuje.
V prípade, že server je zaťažený o 5% viac, ako by mal, bloky ktoré má navyše sa rozdelia medzi menej zaťažené servery. Použitie optimalizácie je možné vypnúť so spúšťacím argumentom -q (--quick). Pri bežnom použití sa to však neodporúča.
Pri pridávaní viacerých serverov je dobré zvážiť konkrétnu situáciu, vo väčšine prípadov stačí spustiť program bez prídavných prepínačov.
Pri veľkých zmenách (napríklad z 2 serverov na 10) je dobré skúsiť si viacero spôsobov.
Spustenie:
./distribute -i=dist.conf >/dev/null
Nám dá výstup:
Servers: 2 -> 10 Blocks to be redistributed: 1583 (79.1896%) Average aberrancy: 2 blocks(0.135068%) Max. aberrancy: 9 blocks(0.450225%)
Pričom s použitím parametra -f --full:
./distribute -f -i=dist.conf >/dev/null
Dostaneme výstup:
Servers: 2 -> 10 Blocks to be redistributed: 1599 (79.99%) Average aberrancy: 0 blocks(0.0450225%) Max. aberrancy: 1 blocks(0.050025%)
V tomto prípade je už rozumnejšie spraviť nové rozloženie (bez vstupného súboru alebo s použitím parametra -f --full
parameter).
Naopak pri menšej zmene napríklad z 10 na 11 serverov bez použitia parametra -f --full
dostaneme:
Servers: 10 -> 11 Blocks to be redistributed: 182 (9.10455%) Average aberrancy: 0 blocks(0.0363818%) Max. aberrancy: 1 blocks(0.050025%)
A s použitím parametra -f --full
:
Servers: 10 -> 11 Blocks to be redistributed: 1809 (90.4952%) Average aberrancy: 0 blocks(0.0363818%) Max. aberrancy: 1 blocks(0.050025%)
Čo by znamenalo, že namiesto 182 blokov by sme presunuli 1809.
Ak by sme teraz chceli pridať ďalšie servery, zakročila by optimalizácia:
Servers: 11 -> 14 Blocks to be redistributed: 497 (24.8624%) Average aberrancy: 3 blocks(0.196527%) Max. aberrancy: 7 blocks(0.350175%)
Ak z nejakého dôvodu chceme minimalizovať počet prenesených blokov (potrebujeme pridať servery a rýchlo to dokončiť), môžeme ju vypnúť parametrom -q --quick
./distribute -q -i=dist.conf >/dev/null
a ušestríme pár blokov na prenose:
Servers: 11 -> 14 Blocks to be redistributed: 367 (18.3592%) Average aberrancy: 22 blocks(1.12556%) Max. aberrancy: -50 blocks(-2.50125%)
Všimnite si však, že sa značne zvýšila priemerná aj maximálna odchýlka od rovnomerného rozdelenia. Na niektorom zo serverov je o 50 blokov menej!
Po vygenerovaní nového rozloženia je potrebné ho pripraviť na všetky servery. Ak boli nejaké servery vylúčené (-E v programe distribute), musíme tieto servery špecifikovať pri spúštaní serverov. Vylúčené servery sa spustia a odošlú hashe novým vlastníkom na základe rozloženia. Spustenie serverov s novým rozložením môže vyzerať napríklad takto (používa sa server.py
viď sekcia Spúšťače):
python3 server.py start -a -E ~/excluded.list -m ~/hashmap.conf -i /tmp/testdata/ -o /tmp/testdata -b ~/bin/server
Migrácia sa spustí automaticky. Skript pre spustenie workerov bude pred spustením spracovania čakať na jej ukončenie.
Server aj Worker majú implementovanú funkciu, ktorá umožňuje pokračovanie v deduplikácii po páde systému. Logy pre rozpracované vertikály v prípade workera a neuložené hashe v prípade servera sú vytvárané automaticky. Pre ich zohľadnenie pri spúšťaní deduplikácie je nutné použiť argument -r/--restore
(pozn. -j
v hash_redistribution) u workera a -j/--journal
u servera. Hotové deduplikované vertikály vo výstupnom priečinku budú preskočené. Rozpracované vertikály, pri ktorých sa nachádza súbor *.dedup.log (v prípade úspešného spracovania je odstránený) budú dokončené a ich log odstránený.
Dôvody pádu a ich riešenie:
Zlyhanie klienta (139 - SEG. FAULT) - nemalo by sa vyskytovať, ak sa objaví prosím nahláste detaily na xcubae00@stud.fit.vutbr.cz - Zlyhanie konkrétneho klienta, servery a ostatní klienti nie sú ovplyvnení a proces pokračuje - Možné klienta spustiť manuálne - "./dedup -i=INPUT_DIR -o=OUTPUT_DIR -s=zoznam bežiacich serverov oddelených medzerami" - Lepšie je však po skončení procesu opakovať spustenie s parametrom -j u servera a -r u klienta (stačí spustiť len konkrétneho klienta, zvyšní by mali všetky súbory preskočiť - vadiť by to nemalo) Zlyhanie servera (139 - SEG. FAULT) - vyskytlo sa pri obnovovaní hashov po páde zo žurnálu veľkých rozmerov - ešte sa to bude skúmať - opravovať - Pomohlo spustenie znova Terminácia klienta (143 - SIGTERM / 137 - SIGKILL / 129 - SIGHUP) - klient obdržal signál SIGTERM/SIGKILL/SIGHUP - ovplyvňuje ostatných klientov - ukončia sa signálom 141 (SIGPIPE) - Riešenie: spustiť znova - pri 129 použiť "nohup" resp. "disown -h" -(pri problémom so štartom serverov použiť iný port) Pád klienta (141 - SIGPIPE) - Klient sa ukončil kvôli pádu iného servera/klienta Problémy so spúšťaním servera - [FAILURE] hostname Exited with error code 1 - v server.py indikujú problém so vstupnými dátami - deduplicate.py cyklicky testuje stav serverov pred začatím procesu - kód 1 znamená, že daný server nie je pripravený. Môže to byť spôsobené tým že prebieha obnova hashov zo žurnálu, prípadne načítavanie hashov z veľkého vstupného súboru. Ak sa počet pripravených serverov dlhšiu dobu(>1min) nemení, je možné, že sa niektorý zo serverov nespustil úspešne. Problém zistíte pripojením na daný server a spustením "screen -r", čo vás prepne do konzoly servera. - Častý problém: bind() failed: Address already in use - na stroji zostali pozostatky z predchádzajúceho neúspešného behu. - Riešenie 1: počkať a skúsiť znova o chvíľu (ukladanie hashov môže trvať aj niekoľko minút!) - pokúsiť sa ukončiť servery cez server.py - Riešenie 2:(pozor na iné vaše procesy!) tvrdo ukončiť všetky vaše procesy na danom serveri - (pkill -SIGKILL -u USER) - Riešenie 3: Použiť iný port
Pri použití prepínača -dd
, *.dedup.dd bude vo výstupnom adresári generovaný súbor:
<dd url="https:/..." title="Some title" status="X"/>
X
môže byť:
K
(kept) - dokument nebol zmenený = je unikátnyD
(dropped) - dokument bol zahodený na základe duplicitného hashu celého dokumentuS
(step by step) - všetky paragrafy boli duplicitné = dokument bol zahodenýxK/yD
(kept/dropped) - časť dokumentu bola zahodená, x = počet ponechaných paragrafov, y = počet zahodených paragrafov (napr. 4K/2D)F
(filtered) - dokument bol zahodený na základe filtraŠtatistiky o dokumentoch teda možno zistiť napríklad takto:
grep "status=\"K\"" *vert.dedup.dd
alebo:
grep "status=\"*/*\"" *vert.dedup.dd
Deduplikácia URL a názvov dokumentov
Pre minimalizáciu relatívne časovo náročnej komunikácie workera a servera sa worker snaží vylúčiť duplicitné dokumenty už na začiatku. Kontrolovaný je názov dokumentu a URL, nedá sa však povedať, že všetky dokumenty s rovnakým názvom/url majú rovnaký obsah. Preto je taký dokument označený ako potenciálne duplicitný a celý načítaný do buffera. Následne sa spočíta hash celého obsahu buffera a odošle sa na vyhodnotenie serveru. Duplicitný dokument bude rovno zahodený, unikátny bude pokračovať v štandardným postupom deduplikácie.
Porovnanie časovej náročnosti
Ako testovacia konfigurácia boli použité 3 servery, 1× server(2 vlákna) a 2× worker(2×2 vlákna). Spracovávali sa vertikály commoncrawlu (CC-2015-40), 10 vertikálov veľkosti dokopy 1.1 GB na jedného workera, tzn. 2.2 GB dát celkovo.
S použitím filtra( viď. vyššie)) | Bez použitia filtra | Bez kontroly URL, titulkov a bufferovania | |
---|---|---|---|
Trvanie**: | 2m 54.187s | 3m 14.327s | 3m 10.427s |
Zahodené paragrafy**: | 1,328,824 | 1,354,775 | 1,354,775 |
Ponechané paragrafy: | 5,048,902 | 5,104,177 | 5,104,177 |
Zahodené titulky/URL: | 4,649 (filters) | 0 - dedup. within 1 CC | N/A |
Ponechané titulky/URL*: | 283,423 | 422,107 | N/A |
Poznámky:
*Ponechané titulky/URL sú následne klasicky deduplikované po paragrafoch.
**Výsledky sa môžu v jednotlivých meraniach pri zapojení viacerých strojov/vlákien mierne odlišovať, záleží na poradí spracovania hashov. Ponechané paragrafy a titulky/URL by však mali ostávať nezmenené.
Použitím viacerých vlákien vieme čas výpočtu skrátiť aj o viac ako 50 % (1m 21.932s pri 6 vláknach, 0m 44.167s pri 10 vláknach na workera). Potom je však vhodné uvažovať nad použitím viacerých serverov (predpokladaný vhodný pomer je približne 1:4).
Režim feedback:
Slouží ke kontrole správnosti deduplikace. Ke každému vyloučenému záznamu (.debug.dropped) je přidána informace o umístění duplikátního hashe. Tato informace se skládá že tří čísel. První udává o kolikátý záznam ze vstupního souboru se jedná. Druhé a třetí číslo je reference na duplicitní záznam. Druhé označuje vstupní soubor (hash jeho cesty) a třetí pak o kolikátý záznam z tohoto souboru jde. Formát je následující. Při deduplikaci odstavců je před každý odstavec(mimo krátkych odstavců) přídáno: <t>POSITION_OF_DROPPED \t POSITION_OF_DUPLICIT
</f>. U formátu Wikilinks jsou přidány dva sloupce, tedy formát je: POSITION_OF_DROPPED \t POSITION_OF_DUPLICIT \t ...
. Jak POSITION_OF_DROPPE
tak i POSITION_OF_DUPLICIT
se skládají ze dvou složek oddělenými dvojtečkou, tedy ve formátu FILE_HASH:OFFSET_IN_FILE
, kde FILE_HASH
je hash cesty vstupního souboru a OFFSET_IN_FILE
je pořadové číslo záznamu(počítáno od 1). Tento režim se zapíná přepínačem -f u workeru. Server není třeba nijak přepínat, sám rozezná podle množství obdržených dat, že si kromě hashe má uložit i jeho umístění. Tato umístění se neukládají do výstupního souboru, tedy po zabití serveru jsou ztraceny.
Skript pro kontrolu deduplikace
Skript vyžaduje aby byla deduplikace spuštěna s argumenty -f
and -dr
. Potom jsou vytvořeny .dropped soubory s referencí na duplicity. Skript pomocí těchto referencí vypíše pod sebou udajné duplicitní záznamy. skript potřebuje zadat argumentem -d
dropped soubor a -f
soubor, který odpovídá logu s deduplikace (potřebuje ho k tomu, aby přeložil hash názvu souboru na název souboru). Pro wikilinks formát použijte přepínač -wl
. Pomocí přepínače -n
lze nastavit přeskakování záznamů (vyhledat všechny záznamy je zdlouhavé), tedy -n 10
znamená, že se zkontroluje jen každý 10. záznam.
Launching server for deduplication
Skript s argumentem start
nejprve spouští screeny a v nich teprve samotné servery. Zadáním argumentu stop
naopak screeny ukončuje. Pokud není zadán ani start
ani stop
ani restart
, tak skript testuje nejprve zda běží screeny a poté také servery.
./processing_steps/3/server.py
Použití:
./server.py [start|stop|restart|migrate] -m MAP_FILE [-i INPUT_FILE] [-o OUTPUT_FILE [-d]] [-a] [-t THREADS] [-p PORT] [-e ERRORS_LOG_FILE] [-b BINARY] [-d] [-j] [-k] [-r SPARSEHASH_SIZE] [-P rport]
start
- Spustí serverystop
- Zastaví servery a počká na serializáciu hashovrestart
- Reštartuje serverymigrate
- Spustí migráciu hashov. Po ukončení migrácie zastaví servery.-i --input
- vstupní soubor-o --output
- výstupní soubor-t --threads
- počet vlákien obsluhujúcich workerov (default = 384) (pozn. odstránené v hash_redistribution)-p --port
- port serveru (výchozí je 1234)-r --resize
- změna velikosti struktury pro uložení hashů (výchozí velikost je 300000000)-e --errors
- pokud je nastaven jsou chyby logovány do tohoto souboru s aktuálním datem a časem-b --binary
- argument specifikující cestu k deduplikačnímu serveru (výchozí je /mnt/data/commoncrawl/corpproc/bin/server)-d --debug
- zároveň s výstupním souborem je generován soubor s debug výpisy, povoľuje core dump-j --journal
- obnovenie hashov zo žurnálového súboru ( použiť v kombinácii s -j u klienta)-k --keep
- ponechanie záložného súboru aj po úspešnom uložení hashov do výstupného súboru-v --valgrind
- Spustí server vo valgrinde - iba pre debugovanie v prípade núdze-P --rport
- Port služby pre migráciu hashov (predvolený je -p + 1)-E --excluded
- Zoznam vylúčených serverov (vygenerovaný distribučným programom) pre migráciu hashov-a --altered
- Povolí migráciu hashov. viď. -a pri server-m --map
- Mapa rozloženia hashovPoznámka:
-j, --journal: pre obnovenie hashov zo zálohy je potrebné uviesť cestu k vstupnému súboru, ktorý bol pred pádom servera zadaný ako výstupný. Skript overí, či sa pri ňom nachádza súbor "vstupný_súbor".backup. Ak vstupný súbor na nejakých serveroch neexistuje, bude vytvorený.
Examples:
./server.py start -m ~/hashmap # Spustí screeny a v nich servery na strojích určených souborem ~/hashmap # Servery čekají až se připojí workery ./server.py -m ~/hashmap # Otestuje zda jsou spuštěny screeny a servery na strojích určených souborem ~/hashmap ./server.py stop -m ~/hashmap -E ~/excluded.list # Ukončí screeny a v nich spuštěné servery na strojích určených souborem ~/hashmap a ~/excluded.list ./server.py migrate -m ~/hashmap -E ~/excluded.list -i ~/input.hash -o ~/output.hash # Spustí migráciu hashov podľa distribučnej mapy. Po ukončení migrácie zastaví servery a uloží hashe.
Spuštění workerů pro deduplikaci
Před tím je nutné spustit servery se stejnými parametry -s
a -p
beforehand.
./processing_steps/3/deduplicate.py
Usage:
./deduplicate.py -i INPUT_DIRECTORY -o OUTPUT_DIRECTORY -w WORKERS_FILE -m MAP_FILE [-p PORT] [-e ERRORS_LOG_FILE] [-b BINARY] [-n] [-d] [-wl]
-i --input
- vstupní adresář se soubory určenými k deduplikaci-o --output
- výstupní adresář, pokud adresář neexistuje, skript se jej pokusí vytvořit-w --workers
- soubor se seznamem workerů, kde je formát: HOSTNAME '\t' THREADS '\n' (jeden řádek pro jeden stroj) (pozn. pozor na nahrádzanie tabov medzerami, nebude fungovať)-p --port
- port serveru (výchozí je 1234)-e --errors
- pokud je nastaven jsou chyby logovány do tohoto souboru s aktuálním datem a časem-b --binary
- argument specifikující cestu k deduplikačnímu programu (výchozí je /mnt/data/commoncrawl/corpproc/bin/dedup)-t --threads
- nastavuje počet vláken (výchozí je 6, jsou-li počty vláken v souboru se seznamem serverů, mají vyšší prioritu)-n --near
- použije algoritmus "nearDedup"-d --debug
- ke každému výstupnímu souboru .dedup je zároveň generován soubor .dedup.dropped obsahující odstraněné duplicity a .dedup.debug obsahující debug výpisy-wl --wikilinks
- deduplikace formátu Wikilinks-dr --dropped
- ke každému výstupnímu souboru .dedup je zároveň generován soubor .dedup.dropped obsahující odstraněné duplicity-f --feedback
- záznamy v souboru .dedup.dropped obsahující referenci na záznamy kvůli kterým byly vyloučeny (viz. níže)-dd --droppeddoc
- Vo výstupnom adresári bude vytvorený súbor "droppedDocs.dd" obsahujúci zoznam kompletne vylúčených dokumentov-j --journal
- pokračovanie v deduplikácii po páde systému - spracované dokumenty budú preskočené, rozpracované budú dokončené (používať v kombinácii s -j na serveri)-m --map
- mapa rozloženia hashovDeduplikace formátu wikilinks je implementována tak, že se počítá hash pro konkatenaci sloupců 2, 3, 5, 6, tedy musejí byt všechny sloupce stejné, aby byl řádek vyhodnocen jako duplicitní. Neardedup funguje tak, že výpočet hashů pomocí N-gramů se provádí na konkatecaci sloupců 5, 3, 6 (v tomto pořadí), dále se počítá hash 2. sloupce a aby byl rádek duplicitní, musejí oba předchozí přístupy odhalit shodu.
Například:
./deduplicate.py -i /mnt/data/commoncrawl/CC-2015-18/vert/ -o /mnt/data/commoncrawl/CC-2015-18/dedup/ -w ~/workers -m ~/hashmap # Data z adresáře /mnt/data/commoncrawl/CC-2015-18/vert/ deduplikuje do adresáře /mnt/data/commoncrawl/CC-2015-18/dedup/ # Deduplikace probíhá na strojích specifikovaných v souboru ~/workers # Očekává, že na strojích určených souborem ~/hashmap je spuštěn server
Umístění
/processing_steps/3/scripts/dedup_check.py
Example:
dedup_check.py -s ~/servers -w ~/workers -i ~/CC-2015-35/vert/ -o ~/cc_test/output/ -sh ~/cc_test/server/
-i --input
- vstupný adresár s vertikálmi-o --output
- výstupný adresár s deduplikovanými vertikálmi-sh --serverhash
- výstupný adresár servera - adresár pre uloženie štruktúry hashov (--output pri TCP, --store pri MPI)-s --servers
- zoznam serverov, formát rovnaký ako pri deduplikácii = 1 hostname na riadok (iba TCP dedup)-w --workers
- zoznam workerov, formát rovnaký ako pri deduplikácii = 1 hostname na riadok (iba TCP dedup)-m --mpi
- prepínač na použitie pri MPI deduplikácii na salomone-h --help
- vypíše túto tabuľkuVýstup:
---------------- Worker knot35 ---------------- Worker running: YES Local input files: 717 Local missing files: 529 (73.77%) Local unfinished files: 6 ... ---------------- Server athena4 ---------------- Server running: YES OK ... ---------------- Summary ---------------- Total files: 33867 Missing files: 25185 (74.36%) Unfinished files: 276 ---------------- Total workers: 46 Workers running: 46 Workers finished: 0 Workers crashed: 0 ---------------- Total servers: 18 Servers running: 18 Servers finished: 0 Servers crashed: 0 Total journals found: 18
Zastavenie deduplikácie pomocou SIGKILL signal - pokračovanie možné za použitia prepínačov -j
(server) a -r
(client).
---------------- Worker athena7 ---------------- Worker running: NO WARNING: Worker crashed! Local input files: 728 Local missing files: 533 (73.21%) Local unfinished files: 6 ... ---------------- Server knot26 ---------------- Server running: NO Warning: Server crashed! - 1 journals found ---------------- Server knot30 ---------------- Server running: NO Server finished! ... ---------------- Summary ---------------- Total files: 33867 Missing files: 25092 (74.08%) Unfinished files: 275 ---------------- Total workers: 46 Workers running: 0 Workers finished: 0 Workers crashed: 46 ---------------- Total servers: 18 Servers running: 0 Servers finished: 16 Servers crashed: 2 Total journals found: 2
Pomocou tohoto skriptu overime správnu funkčnost skriptu:
testing/dedup_test.py
RSpustí test deduplikácie zadaných serveroch a vstupných dátach:
-s --servers
- Súborom so zoznamom serverov, 1 hostname na riadok. Default = ./servers.txt-i --input
- Zložka so vstupnými dátami. Default = /mnt/data/commoncrawl/CC-2015-40/vert/-b --bin
- Zložka deduplikácie (processing_steps/3/). Default = ../-d --debug
- Zapne debugovacie výpisy a core-dumping servera i workera.-c --clean
- make clean + nakopírovanie vstupných dát - nutné pri prvom spusteníPríklad (a odporúčený spôsob spúšťania):
python3 dedup_test.py -c
Parameter -c stačí pri prvom behu.
Deduplikačná knižnica (ďalej len libdedup) umožňuje používať deduplikáciu objektovo priamo z prostredia Python.
Zdrojové súbory:
git:[corpproc_dedup]:/processing_steps/3/libdedup
Poznámka:
Nepoužívajte libdedup v zdieľanom home na minerva1, použite lokálne úložisko!
Debian based (Ubuntu) | RPM based (Fedora/CentOs) |
---|---|
gcc, g++ | gcc, g++ |
libglib2.0-dev >= 2.48* | glib2-devel |
libgirepository1.0-dev | gobject-introspection-devel |
libboost-system-dev, libboost-filesystem-dev | boost-devel |
pssh, screen, python | pssh, screen, python |
Poznámka:
*Na serveroch KNOT je nainštalované Ubuntu 14.04.5, ktoré poskytuje verziu 2.40, verzia 2.48.2 je pre použitie knižnice nainštalovaná automaticky použitím autobuild.sh
Ak si nie ste istý verziou knižnice glib2, použite príkaz:
bash autobuild.sh
v priečinku processing_steps/3/libdedup
.
Knižnica glib2 bude v prípade nedostatočnej verzie skompilovaná a použitá pre zostavenie libdedup.
Na novších verziách distribúcie (s knižnicou libglib-2.0-dev >= 2.48) je možné taktiež použiť:
bash build.sh
v priečinku libdedup/src
.
Príklad reálneho použitia nájdete v:
libdedup/test.py
Pre priame použitie v Pythone je možné použiť skript:
libdedup/load.sh
Skript nastaví premenné prostredia a spustí interpretér Python.
Pre použitie s vlastným skriptom je potrebné nastaviť premenné prostredia:
export PYTHONPATH=processing_steps/3/libdedup/ export GI_TYPELIB_PATH=processing_steps/3/libdedup/src/ export LD_LIBRARY_PATH=processing_steps/3/libdedup/src/
Cestu upravte podľa potreby.
Knižnicu importujeme pomocou:
import dedup
Vytvoríme nový objekt dedup:
Argumenty a ich predvolené hodnoty pri zavolaní dedup.dedup():
bin_path = /mnt/data/commoncrawl/libdedup/bin/
- zložka s binárnym súborom server (zhodný s TCP/IP dedup)hash_path = /mnt/data/commoncrawl/libdedup/hashes/
- zložka pre načítanie a uloženie hashov load = True
- server pri spustení načíta hashe zo súboru (uloženie je implicitné)near = False
- použi algoritmus neardedupmapFile= ./hashmap.conf
- cesta k distribučnej mapeexcluded = None
- zoznam vylúčených serverov pre migráciuport = 1237
- port serverarport = 1238
- port pre migráciu hashovmigration = False
- povolí migráciu hashovdebug = False
- povolí debugovacie výpisydropped = False
- uloží zahodené paragrafy a dokumenty do vert.dropServery sa spustia pomocou:
de.start_servers()
Ak funkcia vráti 0, štart bol úspešný. Servery zostanú bežať aj po ukončení Pythonu. Preto je dôležité ich po ukončení práce správne ukončiť metódou:
de.stop_servers()
Ak je zastavenie úspešné - teda servery boli na danom porte spustené, vráti 0. V prípade, že sa chcete pripojiť už na bežiace servery, stačí inicializovať objekt dedup s rovnakými argumentami ako pri predchádzajúcom spustení.
Funkčnosť serverov si môžete overiť metódou:
de.test_servers()
Ak servery bežia a migrácia hashov je ukončená, vráti 0.
Workera vytvoríme ako::
worker = de.create_worker()
Metóda otestuje servery a vytvorí workera, ktorý sa na ne pripojí. V tomto momente worker očakáva vstup vo forme vertikalizovaného článku uloženého v reťazci - string.
Článok deduplikujeme ako:
vert = worker.process(DATA)
Výstup je vo forme objektu Vert, ktorý má atriubúty:
vert.result
- číselný kód reprezentujúci výsledok
vert.data
- reťazec reprezentujúci deduplikovaný dokumentvert.dropped
- počet zahodených paragrafov
vert.kept
- počet ponechaných paragrafovvert.drop
- obsahuje vylúčené paragrafy (ak bolo pri inicializácii zvolené dropped=True, inak NULL)Workerov je možno vytvoriť viacero, aj vo viacerých vláknach. Objekty Vert a Worker môžeme jednoducho zmazať pomocou del vert
resp. del Worker
.
Pre deduplikovanie vertikálov obsahujúcich veľké množstvo článkov použite klasickú deduplikáciu.
Architektúra superpočítača Salomon sa od serverov KNOT značne líši. Servery sú prepojené pomocou technológie InfiniBand a na výpočty sa používa prostredie OpenMPI.
Do OpenMPI prostredia sa nedá tak jednoducho vstúpiť a pripojiť sa na servery, ako je tomu pri TCP/IP deduplikačnej knižnici(vytvorenie nového workera).
OpenMPI si môžeme predstaviť ako uzatvorené prostredie, v ktorom sa nachádza niekoľko workerov, hashholderov a dve jadrá, ktoré sa starajú o réžiu. Jedno jadro číta vertikály zo STDIN a rozdeľuje ich workerom, druhé zbiera deduplikované vertikály a ukladá ich do súborov.
Inicializácia a komunikácia s prostredím deduplikácie je zastrešená nadstavbou mpidedup.py
, ktorá obsahuje vstupnú frontu. Dokument sa zapíše na vstupnú frontu, odkiaľ si ho vyzdvihne zapisovacie vlákno a pošle ho do OpenMPI prostredia.
Skripty:
processing_steps/salomon/mpidedup/lib
Preklad
Binárny súbor je vytvorený spolu s deduplikáciou pre salomon spustením make v zložke processing_steps/salomon/mpidedup
a umiestnený do zložky lib
.
Rozdelenie jadier Salomonu na workerov a hashholderov je možné určiť pri štarte. V prípade použitia distribučnej mapy sa počet hashholderov určí automaticky. Minimálny počet jadier je počet_hashholderov + 3
.
Pri vytvorení objektu triedy mpidedup
je možné nastaviť následujúce parametre:
output_dir
- povinný - zložka kde sa uložia deduplikované vertikálybin_dir
- zložka obsahujúca binárny súbor libmpidedup store_dir
- zložka na načítanie a uloženie hashovworkers
- počet workerovdebug
- zapnúť ladiace výpisydropped
- generuje súbory *.dropped obsahujúce zahodené paragrafydroppeddoc
- generuje súbory *.dd obsahujúce zoznam spracovaných dokumentovmap_path
- cesta ku konfiguračnej mape rozloženia hashov$ module load Python/3.6.1 $ module load Boost/1.59.0-intel-2016.01 $ module load OpenMPI/1.8.8-GNU-4.9.3-2.25 $ python3 >>> import mpidedup de = mpidedup.mpidedup(output_dir, bin_dir="~/bin/", dropped=True, droppeddoc=True) de.start() #de.queue = input queue de.queue.put(vert) de.finish() # deduplikácia sa po poslednom dokumente na vstupe ukončí, blokuje do ukončenia deduplikácie #de.stop() # okamžite ukončí deduplikáciu - dokumenty na vstupe zostanú nespracované
Pre príklad použitia libmpidedup si pozrite súbory lib/libmpidedup_test.py
a lib/run_test_salomon.sh
.
Spustiť test na salomone je možné pomocou:
qsub run_test_salomon.sh
Distributing a document into files
Pre zachovanie pôvodného rozdelenia vertikálov do súborov je potrebné do hlavičiek dokumentov pridať meno súboru v tvare:
vert_source="filename" Napíklad: <doc vert_source="1443736676547.12_20151001215756-00001.vert" id="..." url="..." title="...">
Ak nie je pôvod dokumentov špecifikovaný, deduplikačná knižnica ich rozdelí automaticky.