While crawling large amounts of websites (thousands of millions up to couple of billions) using the Spider method, the web archive can frequently have duplicate information. We only need one record of each information for further processing, to avoid processing the same information multiple times.
Usually it tend to be websites:
We want to keep only the essential/unique information for further processing from each website.
The internet represents enormous amount of data. For each website, it is necessary to check whether or not a specific article/part is already in our database.
Deduplication is executed as distributed calculation for this reason. Individual processes are divided to "server" and "worker".
Represents database of known articles/parts which are saved as hash (20 digit integer). If there are more servers, each server is responsible only for a small part of the hash. Its only task is to process the request to check if particular hash exists in database, if so - it is marked as duplicate. If it is new, it is marked as unique and saved to the database. Database can be loaded from a file during the server start and saved after successful termination.
Processes parts of commoncrawl in vertical (1 word on line, HTML free) format. Header of a website is loaded first. It checks whether URL with title is unique. If not, it will calculate hash of the whole article. If a whole article is a duplicate, it is excluded. If it is not a duplicate, deduplication of the content begins. Text is divided into paragraphs. Paragraphs are divided into short paragraph (shorter than a constant - usually 50 characters) and long paragraph. Short paragraph can be for example a title, or a short description. It can be the same in many articles and we do not want to remove a title (such as "Gallery") from the Internet so we will keep it in buffer. If we come across a long paragraph of text, we calculate it's hash and contact corresponding server. In case that it is unique, it will be kept. If not, it will be thrown away.
In this manner each worker will step by step go through every file in input directory. Result is a one file *.dedup for each input file that is duplication free in regard to processed data.
All programs for deduplication are in repository of KNOT on branch corpproc_dedup.
Programs dedup and server are used for deduplication and are available after compiling via Makefile in directory:
[git::corpproc_dedup]/processing_steps/3/dedup/
Launch parameters:
./server -m=~/hashmap.conf [-i=INPUT_FILE] [-o=OUTPUT_FILE [-d]] [-s=STRUCT_SIZE] [-p=PORT] [-j=JOURNAL_FILE] [-k] [a] [-P=RPORT]
-i --input
- input file-o --output
- output file-h --help
- print help-p --port
- server port (default 1234)-s --size
- change size of structure for saving hashes (default size is 300,000,000)-d --debug
- a debug(log) file is generated simultaneously with the output file-j --journal
- recovery of hashes from journal file - (use with -j at worker)-k --keep
- journal file will be archived after hashes are successfully saved to output file-m --map
- hash distribution map-a --altered
- distribution map changed, enable hash migration-p --rport
- port for hash migrationServer runs until it's "killed". Hashes are saved to output file (if a path is speficied) as a reaction to SIGHUP and SIGTERM signals. Signal SIGPIPE is ignored. Server will also generate a journal file if a path to file for hashes is specified. Each received hash is recorded there in case of an error, or server crash before all hashes would be saved to the output file.
To recover hashes from journal it is necessary to launch server with parameter -j=~/server_hash.backup
. Hashes will be loaded along with the input file (if specified) and if the output file is specified, recovered hashes will be saved as output data.
After server is terminated successfully the journaling file (*.backup) is removed. For example:
./server -m=~/hashmap.conf -i=~/server_hash -o=~/server_hash -j=server_hash.backup
Launch paramters:
./dedup -i=INPUT_DIR -o=OUTPUT_DIR -s=SERVERS_STRING [-p=PORT] [-t=THREADS] [-n] [-d] [-wl] [-uf=FILTER_FILE]
-i --input
- input folder with files for deduplication-o --output
- output folder, if it does not exist, script attempts its creation-p --port
- port of server (default 1234)-t --threads
- sets number of threads (default 6)-n --near
- uses algorithm "nearDedup"-d --debug
- for every output file .dedup file .dedup.debug containing debugging logs is generated-wl --wikilinks
- deduplication of format WikiLinks-dr --dropped
- for each output file .dedup a file .dedup.dropped containing deleted duplicates is generated-f --feedback
- records in file .dedup.dropped containing reference to records responsible for its elimination (more below)-dd --droppeddoc
- for each output file *.dedup a file .dedup.dd containing list of URL addresses of completely eliminated documents is generated-j --journal
- continue in dedup after system crash - already processed files are skipped, unfinished ones are about to be finished (use with -j on server)-h --help
- print help-m --map
- hash distribution mapScripts server.py
and deduplicate.py
were created for easier launch on multiple machines in parallel. Programs have to be pre-distributed on all used machines and have to be in the same location e.g.: /mnt/data/bin/dedup
Hashes are divided into blocks (1999 by default - needs to be specified before processing it for the first time, can't be changed afterwards). Blocks are redistributed equally to servers before the first launch using the distribute program, that generates a config file.
The distribute program runs simultaneously with worker and server.
Usage:
./distribute [-s=SERVER_FILE] [-i=INPUT_FILE] [-o=OUTPUT_FILE][-b=BLOCK_COUNT] [-d] [-q] [-f] [-E=EXC_FILE]
-s --servers
- hashholder list file, 1 hostname per line (default:"./servers.txt")-o --output
- saves configuration to output file (Default: STDOUT is used)-b --blocks
- hash blocks (default:1999)-d --debug
- enable debug logs-q --quick
- turn distribution optimization off; minimizes transferred blocks, but may compromise rectangular distribution-f --full
- ignore old configuration (only for statisctics).-E --excluded
- saves list of unused servers into file if some servers were removed; this list must added to server launcher configuration for proper hash migrationThe blocks distribution is based on the remainder of blocks divided by number of servers.
Example for 10 servers:
block 0, 10, 20, ... distributed to server with index value 0 block 1, 11, 21, ... distributed to server with index value 1 ... block 9, 19, 29, ... distributed to server with index value 2
For 100 blocks (<0,99>) each server will carry 10 blocks. This allows us to add more servers. New server would have index value 10, therefore each block each block with a remainder of 10 after dividing by 11 (current number of servers) will be distributed there.
blocks: 10, 21, 32, 43, 54, 65, 76, 87 and 98
Note that every server (except for 9 or 11) lost a block.
new distribution: servers 0..8 and 10 carry 9 blocks, server 9 carries 10 blocks (10 * 9 + 1 * 10 = 100)
However, this causes an issue if we add server with index value 11:
blocks on server 11: 11, 23, 35, 47, 59, 71, 83, 95
This would mean that servers 1, 3, 5, 7 and 9 lose two blocks. The number of blocks would remain the same, but it would no longer be equally distributed to all the servers.
When we remove a server, it's blocks are again distributed among the remaining servers equally using the same method. This means that when we add a server again, the distribution could be unequal without optimization.
To prevent the issue mentioned above, we optimize the distribution.
If a server load is at least 5% higher than it should be, it's excess blocks are distributed between the servers with lower load. Optimization can be disabled using the -q --quick
argument when launching. Not advised for daily use.
In most cases, you can run the program without additional arguments, however it is advised to consider the situation when adding multiple servers.
There are multiple ways to make extensive changes (e.g. 2->10 servers).
Launch:
./distribute -i=dist.conf >/dev/null
Output:
Servers: 2 -> 10 Blocks to be redistributed: 1583 (79.1896%) Average aberrancy: 2 blocks(0.135068%) Max. aberrancy: 9 blocks(0.450225%)
Launch with paramater -f --full:
./distribute -f -i=dist.conf >/dev/null
Output:
Servers: 2 -> 10 Blocks to be redistributed: 1599 (79.99%) Average aberrancy: 0 blocks(0.0450225%) Max. aberrancy: 1 blocks(0.050025%)
In this case, it is advised to perform new distribution (without input file or using the -f --full
parameter).
Output when changing the number of servers from 10 to 11 without -f --full
parameter:
Servers: 10 -> 11 Blocks to be redistributed: 182 (9.10455%) Average aberrancy: 0 blocks(0.0363818%) Max. aberrancy: 1 blocks(0.050025%)
Output with parameter -f --full
:
Servers: 10 -> 11 Blocks to be redistributed: 1809 (90.4952%) Average aberrancy: 0 blocks(0.0363818%) Max. aberrancy: 1 blocks(0.050025%)
Which would mean that we moved 1809 blocks, instead of 182.
If we try to add more servers now, the optimization will take effect:
Servers: 11 -> 14 Blocks to be redistributed: 497 (24.8624%) Average aberrancy: 3 blocks(0.196527%) Max. aberrancy: 7 blocks(0.350175%)
If we want to minimize the number of transferred blocks, we can disable the optimization using the -q --quick
parameter:
./distribute -q -i=dist.conf >/dev/null
Output:
Servers: 11 -> 14 Blocks to be redistributed: 367 (18.3592%) Average aberrancy: 22 blocks(1.12556%) Max. aberrancy: -50 blocks(-2.50125%)
Note that both average and maximum aberrancy has increased significantly.
When a new distribution is generated, it's necessary to set all servers up. If any server is excluded (-E in program distribute), we need to specify these servers when launching. Excluded servers will launch and send hashes to their new owners according to the distribution. Server launching with new distribution can be done using the following command (server.py
is used here, see Launchers):
python3 server.py start -a -E ~/excluded.list -m ~/hashmap.conf -i /tmp/testdata/ -o /tmp/testdata -b ~/bin/server
Migration is launched automatically. Worker launch script will wait until the migration ends.
Server and worker both have a function that allows them to continue in deduplication after system crash. Logs for processed verticals that were interrupted (worker) and unsaved hashes (server) are automatically created. To take these into consideration while starting deduplication it is necessary to use argument -r --restore
at worker and -j --journal
at server. Finished deduplicated verticals in the output directory will be skipped. Unfinished ones, which include file *.dedup.log (removed if processing was successful), are about to be finished and their log will be removed.
Crash causes and possible solutions:
Client fail (139 - SEG. FAULT) should not occur, if it does pop up, please report circumstances of crash to xcubae00@stud.fit.vutbr.cz - Failure of a specific client does not influence servers and other clients - It is possible to launch client manually - "./dedup -i=INPUT_DIR -o=OUTPUT_DIR -s=list of running servers separated by spaces" - It is advised to repeat launch with parameter -j (server) and -r(worker) when the process is finished. (just turn on specific client, the rest should skip each file - should not matter) Server failure (139 - SEG. FAULT) - occured while recovering hashes after crash from journal of large proportions - will be investigated further - will be fixed - Relaunching helps Client termination (143 - SIGTERM / 137 - SIGKILL / 129 - SIGHUP) - client received signal SIGTERM/SIGKILL/SIGHUP - influences other clients - terminate using signal 141 (SIGPIPE) - Solution: relaunch - at 129 use "nohup" respectively "disown -h" (use another port if problems occur during launch) Client crash (141 - SIGPIPE) - Client terminated because of failure of another server/client Problems with server launch - [FAILURE] hostname Exited with error code 1 - issues with data are indicated in server.py - deduplicate.py cyclically tests status of servers before start of a process, code 1 means that particular server is not ready. It can be caused by an ongoing recovery of hashes from journal, or by loading hashes from large input file. If number of servers ready does not change for a longer time (>1min), it is possible that some servers were not launched successfully. Problem can be discovered by connecting to particular server and launching "screen -r", which will switch to the server console. - A common problem is bind() failed: Address already in use - files from an unsuccessful run remain on a machine - Solution 1: hold on and try again in a bit (saving hashes may take a couple of minutes!), try to terminate servers using server.py - Solution 2: (be cautious as other processes are still running!) terminate all of your processes on particular server (pkill -SIGKILL -u USER) - Solution 3: Use another port
When using parameter -dd
, *.dedup.dd file with the following format will be created:
<dd url="https:/..." title="Some title" status="X"/>
X
can be:
K
(kept) - document wasn't changed = it is uniqueD
(dropped) - dropped due to duplicate hash of whole documentS
(step by step) - all paragraphs were duplicate = document was droppedxK/yD
(kept/dropped) - some of the paragraphs were dropped, x = kept paragraphs, y = dropped paragraps (eg. 4K/2D)F
(filtered) - dropped due to a filterFor example, you can check document stats using:
grep "status=\"K\"" *vert.dedup.dd
or:
grep "status=\"*/*\"" *vert.dedup.dd
Deduplication of URL and document's names
To minimize relatively time-consuming communication between worker and server, worker attempts to exclude duplicit documents in the beginning. Name of file and URL is checked, however it cannot be said that every document with the same name/url has got the exact content. For this reason the document is tagged as a potential duplicite and the entire document is loaded into a buffer. Hash of the entire content of buffer is calculated afterwards and is sent to server for evaluation. Duplicate document will be dropped, unique will continue the standard procedure of deduplication.
Comparison of time spent
Test configuration was - 3 servers, 1× server(2 threads) a 2× worker(2×2 threads). Verticals of commoncrawl (CC-2015-40) were processed, 10 verticals 1.1 GB large together per each worker, i.e. 2.2 GB of data together.
With filter (see above) | Without filter | Without checking URL, titles and buffering | |
---|---|---|---|
Duration**: | 2m 54.187s | 3m 14.327s | 3m 10.427s |
Paragraphs thrown away**: | 1,328,824 | 1,354,775 | 1,354,775 |
Retained paragraphs: | 5,048,902 | 5,104,177 | 5,104,177 |
Titles/URL thrown away: | 4,649 (filters) | 0 - dedup. within 1 CC | N/A |
Retained titles/URL*: | 283,423 | 422,107 | N/A |
Notes:
*Retained titles/URL are usually deduplicated by paragraphs.
**Results can slightly differ in each measuring when plugging in more machines/threads, it depends on the order of hash processing. Retained paragraphs and titles/URL should remain unchanged though.
By using multiple threads the time of calculation can be reduced by more than 50 % (1m 21.932s with 6 threads, 0m 44.167s with 10 threads per worker - times are without filtering). It is appropriate to consider using multiple servers (presumed eligible ratio is approximately 1:4).
Feedback mode
It is used to check validity of deduplication. To each excluded record (.debug.dropped) information about location of duplicit hash is added. This information consists of 3 numbers. First reports what record from the input file it is. Second and third number is a reference to duplicate record. Second specifies the input file (hash and its paths) and third specifies which record from this file it is. The following format is used (added before each paragraph, except for short paragraphs): <t>POSITION_OF_DROPPED \t POSITION_OF_DUPLICIT
</f>. Wikilings format uses 2 extra columns. Both POSITION_OF_DROPPED
and POSITION_OF_DUPLICIT
are comprised of 2 parts separated by a colon. Format: FILE_HASH:OFFSET_IN_FILE
, where FILE_HASH
is input file path hash and OFFSET_IN_FILE
is the number of record (starting with 1). This mode can be enabled at worker launch using -f
parameter. Servers don't need any additional parameters, servers automatically detect whether they should save the location of the hash in addition to the hash. The locations are not saved to output file, which means that when the server is terminated, the locations are lost.
Script checking deduplication
Script requires deduplication to be launched with arguments -f
and -dr
. Then .dropped files with reference to duplicates are created. Script uses these references to print a list of supposed duplicate records. The script requires a dropped file (parameter -d
) and file (paramtere -f
) corresponding to the deduplication logs. To use wikilinks format, launch with switch -wl
. Records can be skipped using the -n
switch (it is too lengthy to go through all records), so -n 10
means that only every tenth record will be checked.
Launching server for deduplication
Script with argument start
first launches screens and servers in screens. If argument stop
is entered, script closes the screens. If neither one of start
, stop
, restart
is entered, then script firstly tests if screens are running and then servers.
./processing_steps/3/server.py
Usage:
./server.py [start|stop|restart|migrate] -m MAP_FILE [-i INPUT_FILE] [-o OUTPUT_FILE [-d]] [-a] [-t THREADS] [-p PORT] [-e ERRORS_LOG_FILE] [-b BINARY] [-d] [-j] [-k] [-r SPARSEHASH_SIZE] [-P rport]
start
- start serversstop
- stop servers, wait for hash serializationrestart
- restart serversmigrate
- migrate hashes and exit-i --input
- input file -o --output
- output file-t --threads
- number of threads containing workers (default = 384)-p --port
- server port (default 1234)-r --resize
- change size of structure for saving hashes (default size is 300000000)-e --errors
- if set, errors are logged into this file with current date and time-b --binary
- argument specifying path to deduplication server (default is /mnt/data/commoncrawl/corpproc/bin/server)-d --debug
- a file containing debug logs is generated simultaneously with the output file-j --journal
- recover hashes from journal file (suggested to use with -j at client)-k --keep
- archive journal file after hashes are successfully saved to output file-v --valgrind
- runs server in valgrind (emergency debugging)-P --rport
- port for hash migration service (default is -p + 1)-E --excluded
- List of excluded servers for hash migration (old->new)-a --altered
- enables hash migration. see -a at server-m --map
- hash distribution mapNote:
-j, --journal: to recover hashes from journal it is necessary to enter path to input file, which was specified as output file before a server crash. Script checks if "input_file".backup. exists. If input file does not exist on any of the servers, it will be created.
Examples:
./server.py start -m ~/hashmap # Launches screens and servers on machines specified by file ~/hashmap in them # Servers are waiting for workers to connect ./server.py -m ~/hashmap # Tests if the screens and servers on machines specified by file ~/hashmap are running ./server.py stop -m ~/hashmap -E ~/excluded.list # It closes screens and servers on machines specified by file~/hashmap and ~/excluded.list ./server.py migrate -m ~/hashmap -E ~/excluded.list -i ~/input.hash -o ~/output.hash # Launches hash migration according to the hash map. Servers are stopped and hashes are saved when migration ends.
Launching workers for deduplication
It is necessary to launch servers with the exact parameters -s
and -p
beforehand.
./processing_steps/3/deduplicate.py
Usage:
./deduplicate.py -i INPUT_DIRECTORY -o OUTPUT_DIRECTORY -w WORKERS_FILE -m MAP_FILE [-p PORT] [-e ERRORS_LOG_FILE] [-b BINARY] [-n] [-d] [-wl]
-i --input
- input folder with files for deduplication-o --output
- output folder, if it does not exist, script attepts its creation-w --workers
- file with list of workers, there is format: HOSTNAME '\t' THREADS '\n' (one line per one machine, beware - replacing tabulators with spaces will not work)-p --port
- server port (default 1234)-e --errors
- if set, errors are logged into this file with current date and time-b --binary
- argument specifying path to deduplication server (default is /mnt/data/commoncrawl/corpproc/bin/dedup)-t --threads
- number of threads (default is 6, if there are numbers of threads in file with list of the servers, they have higher priority)-n --near
- it uses algorithm "nearDedup"-d --debug
- simultaneously with output file .dedup a .dedup.dropped file containing removed duplicates and .dedup.debug file containing debugging dumps are generated-wl --wikilinks
- deduplication of format Wikilinks-dr --dropped
- for each output file .dedup a file .dedup.dropped file containing removed duplicates is generated-f --feedback
- records in file .dedup.dropped containing reference records responsible for its elimination (see below)-dd --droppeddoc
- file "droppedDocs.dd" containing list of completely excluded documents will be created in output directory-j --journal
- continue in dedup after system crash - already processed files are skipped, unfinished ones are about to be finished (use with -j on server)-m --map
- hash distribution mapDeduplication of format wikilinks is implemented so that, hash is calculated for concatenation of columns 2, 3, 5, 6, all the columns have to be the same to evaluate row as duplicated. Neardedup calculates hashes with the help of N-grams on concatenation of columns 5, 3, 6 (in this order), then it calculates hash of 2. column. The row is a duplicate if both last entries found conjunction.
Example:
./deduplicate.py -i /mnt/data/commoncrawl/CC-2015-18/vert/ -o /mnt/data/commoncrawl/CC-2015-18/dedup/ -w ~/workers -m ~/hashmap # Data from folder /mnt/data/commoncrawl/CC-2015-18/vert/ deduplicate to folder /mnt/data/commoncrawl/CC-2015-18/dedup/ # Deduplicaton runs on machines specified in file ~/workers # A launched server is expected on machines specified in file ~/hashmap
Located:
/processing_steps/3/scripts/dedup_check.py
Example:
dedup_check.py -s ~/servers -w ~/workers -i ~/CC-2015-35/vert/ -o ~/cc_test/output/ -sh ~/cc_test/server/
-i --input
- input directory with verticals-o --output
- output directory with deduplicated verticals-sh --serverhash
- output server directory - directory used to save structure of hashes (--output with TCP, --store with MPI)-s --servers
- list of servers, same format as in deduplication = 1 hostname on line (only TCP dedup)-w --workers
- list of workers, same format as in deduplication = 1 hostname on line (only TCP dedup)-m --mpi
- switch to be used with MPI deduplication on Salomon-h --help
- print helpOutput:
---------------- Worker knot35 ---------------- Worker running: YES Local input files: 717 Local missing files: 529 (73.77%) Local unfinished files: 6 ... ---------------- Server athena4 ---------------- Server running: YES OK ... ---------------- Summary ---------------- Total files: 33867 Missing files: 25185 (74.36%) Unfinished files: 276 ---------------- Total workers: 46 Workers running: 46 Workers finished: 0 Workers crashed: 0 ---------------- Total servers: 18 Servers running: 18 Servers finished: 0 Servers crashed: 0 Total journals found: 18
Stop of deduplication using SIGKILL signal - it is possible to continue using switches -j
(server) and -r
(client).
---------------- Worker athena7 ---------------- Worker running: NO WARNING: Worker crashed! Local input files: 728 Local missing files: 533 (73.21%) Local unfinished files: 6 ... ---------------- Server knot26 ---------------- Server running: NO Warning: Server crashed! - 1 journals found ---------------- Server knot30 ---------------- Server running: NO Server finished! ... ---------------- Summary ---------------- Total files: 33867 Missing files: 25092 (74.08%) Unfinished files: 275 ---------------- Total workers: 46 Workers running: 0 Workers finished: 0 Workers crashed: 46 ---------------- Total servers: 18 Servers running: 0 Servers finished: 16 Servers crashed: 2 Total journals found: 2
Use this script to test correct behavior of deduplication software:
testing/dedup_test.py
Runs test on specified servers and input data:
-s --servers
- Server list file, 1 hostname per line. Default = ./servers.txt-i --input
- Input data directory (first run with -c). Default = /mnt/data/commoncrawl/CC-2015-40/vert/-b --bin
- Folder with dedup sources (processing_steps/3/). Default = ../-d --debug
- Enable debug logs (test script + server + worker)-c --clean
- make clean + setup input data - mandatory for first runRun example:
python3 dedup_test.py -c
You can omit -c in next runs.
Deduplication library (or libdedup) allows you to use object-oriented deduplication directly from Python.
Source files:
git:[corpproc_dedup]:/processing_steps/3/libdedup
Note:
Do not use libdedup on shared minerva1 home, better use some local drive!
Debian based (Ubuntu) | RPM based (Fedora/CentOs) |
---|---|
gcc, g++ | gcc, g++ |
libglib2.0-dev >= 2.48* | glib2-devel |
libgirepository1.0-dev | gobject-introspection-devel |
libboost-system-dev, libboost-filesystem-dev | boost-devel |
pssh, screen, python | pssh, screen, python |
Note:
*KNOT servers are running on Ubuntu 14.04.5, which contains libglib2.0-dev version 2.40, version 2.48.2 is built for the purposes of libdedup automatically using autobuild.sh
If you are uncertain of libglib2.0-dev version, use command:
bash autobuild.sh
in directory processing_steps/3/libdedup
.
If needed, glib2 2.48.2 will be compiled automatically.
New distribution versions (with libglib-2.0-dev >= 2.48) can also use:
bash build.sh
in directory libdedup/src
.
Short usage example can be found here:
libdedup/test.py
To use libdedup directly from Python interpreter use:
libdedup/load.sh
Script will set up environment variables and launch Python.
When using libdedup in own script, you have to manually set up environment variables:
export PYTHONPATH=processing_steps/3/libdedup/ export GI_TYPELIB_PATH=processing_steps/3/libdedup/src/ export LD_LIBRARY_PATH=processing_steps/3/libdedup/src/
(Do not forget to change path for your needs)
Import libdedup using:
import dedup
Create new dedup object using:
de = dedup.dedup(args - see below)
bin_path = /mnt/data/commoncrawl/libdedup/bin/
- folder with binary file server (same as TCP/IP dedup)hash_path = /mnt/data/commoncrawl/libdedup/hashes/
- folder for loading/saving hashes load = True
- load hashes from input folder (saving is implicit)near = False
- use neardedup algorithmmapFile= ./hashmap.conf
- distribution map pathexcluded = None
- list of excluded serversport = 1237
- server portrport = 1238
- hash migration portmigration = False
- enable hash migrationdebug = False
- enable debug logsdropped = False
- save dropped docs and paragraphs into vert.dropStart servers with:
de.start_servers()
Returns 0, if successful. Servers will stay running even after Python interpreter is terminated, so it is important to shut them down correctly using:
de.stop_servers()
Returns 0, if successful - if servers was running on chosen port. If you want to connect to running servers, remember to initialize dedup object with same args.
You can test, if servers are running on chosen port using:
de.test_servers()
Returns 0, if running.
Create worker using:
worker = de.create_worker()
Method will test servers and create worker, which will connect to them. In this moment, everything is ready and worker is waiting for verticalized article as input.
Deduplicate article using:
vert = worker.process(DATA)
On output we have Vert object with attributes:
vert.result
- return code
vert.data
- ontains string representing deduplicated verticalized articlevert.dropped
vert.kept
- amount of kept paragraphsvert.drop
- contains dropped paragraphs (when initialized with dropped=True else NULL)It is possible to create multiple workers, even in multiple threads. Objects Vert and Worker are destroyed using del vert
or del Worker
.
For deduplication of multiple vert files containing lot of articles, use classic deduplication.
Salomon supercomputer architecture differs from KNOT servers architecture. Nodes are connected using the InfiniBand technology and OpenMPI environment is used for processing.
There is no easy way to enter OpenMPI environment at run time and connect to servers. Everything needs to be set up at launch.
We can imagine OpenMPI like closed box, where several workers, hasholders and 2 cores managing everything are located. One core reads verticals from STDIN and assigns them to workers, the other one collects deduplicated verticals and saves them to output files.
Initialization and communication with MPI environment is done using python wrapper mpidedup.py
, that contains entry queue. Document is placed in queue, waiting to be collected by a writing thread and sent to OpenMPI environment.
Scripts:
processing_steps/salomon/mpidedup/lib
Build
Build binary file created using the make command in folder processing_steps/salomon/mpidedup
- built with salomon deduplication, and then placed in folder lib
.
Salomon cores distribution to workers and hashholders can be set at launch. If a hash map is used, number of hashholders is set automatically. At least hashholder_number + 3
cores.
You can set following parameters for mpidedup
object:
output_dir
- required - output directory for processed documentsbin_dir
- folder with libmpidedup binarystore_dir
- hash load & store folderworkers
- number of workersdebug
- enable debug logsdropped
- generate *.dropped files containing dropped paragraphsdroppeddoc
- generate *.dd files containing the list of processed documentsmap_path
- path to hash distribution map$ module load Python/3.6.1 $ module load Boost/1.59.0-intel-2016.01 $ module load OpenMPI/1.8.8-GNU-4.9.3-2.25 $ python3 >>> import mpidedup de = mpidedup.mpidedup(output_dir, bin_dir="~/bin/", dropped=True, droppeddoc=True) de.start() #de.queue = input queue de.queue.put(vert) de.finish() # finalize deduplication after last document from input is processed, blocks until dedup is done #de.stop() # stop deduplication now! - files on input will remain unprocessed
For more complex example see lib/libmpidedup_test.py
and lib/run_test_salomon.sh
.
You can run salomon test using:
qsub run_test_salomon.sh
Distributing a document into files
To preserve the original document distribution, add source file name into document header like this:
vert_source="filename" For instance: <doc vert_source="1443736676547.12_20151001215756-00001.vert" id="..." url="..." title="...">
When source file name is not specified, documents will be distributed automatically.