Removing duplicates from text

Table of Contents

1 Introduction

1.1 Motivation

While crawling large amounts of websites (thousands of millions up to couple of billions) using the Spider method, the web archive can frequently have duplicate information. We only need one record of each information for further processing, to avoid processing the same information multiple times.

Usually it tend to be websites:

We want to keep only the essential/unique information for further processing from each website.

1.2 Principle

The internet represents enormous amount of data. For each website, it is necessary to check whether or not a specific article/part is already in our database.

Deduplication is executed as distributed calculation for this reason. Individual processes are divided to "server" and "worker".

1.2.1 Server

Represents database of known articles/parts which are saved as hash (20 digit integer). If there are more servers, each server is responsible only for a small part of the hash. Its only task is to process the request to check if particular hash exists in database, if so - it is marked as duplicate. If it is new, it is marked as unique and saved to the database. Database can be loaded from a file during the server start and saved after successful termination.

1.2.2 Worker

Processes parts of commoncrawl in vertical (1 word on line, HTML free) format. Header of a website is loaded first. It checks whether URL with title is unique. If not, it will calculate hash of the whole article. If a whole article is a duplicate, it is excluded. If it is not a duplicate, deduplication of the content begins. Text is divided into paragraphs. Paragraphs are divided into short paragraph (shorter than a constant - usually 50 characters) and long paragraph. Short paragraph can be for example a title, or a short description. It can be the same in many articles and we do not want to remove a title (such as "Gallery") from the Internet so we will keep it in buffer. If we come across a long paragraph of text, we calculate it's hash and contact corresponding server. In case that it is unique, it will be kept. If not, it will be thrown away.

In this manner each worker will step by step go through every file in input directory. Result is a one file *.dedup for each input file that is duplication free in regard to processed data.


2 Deduplication

All programs for deduplication are in repository of KNOT on branch corpproc_dedup.

Programs dedup and server are used for deduplication and are available after compiling via Makefile in directory:

 [git::corpproc_dedup]/processing_steps/3/dedup/
        

2.1 Server

Launch parameters:

 ./server -m=~/hashmap.conf [-i=INPUT_FILE] [-o=OUTPUT_FILE [-d]] [-s=STRUCT_SIZE] [-p=PORT] [-j=JOURNAL_FILE] [-k] [a] [-P=RPORT]
        

Server runs until it's "killed". Hashes are saved to output file (if a path is speficied) as a reaction to SIGHUP and SIGTERM signals. Signal SIGPIPE is ignored. Server will also generate a journal file if a path to file for hashes is specified. Each received hash is recorded there in case of an error, or server crash before all hashes would be saved to the output file.

To recover hashes from journal it is necessary to launch server with parameter -j=~/server_hash.backup. Hashes will be loaded along with the input file (if specified) and if the output file is specified, recovered hashes will be saved as output data.

After server is terminated successfully the journaling file (*.backup) is removed. For example:

 ./server -m=~/hashmap.conf -i=~/server_hash -o=~/server_hash -j=server_hash.backup
        

2.2 Worker

Launch paramters:

./dedup -i=INPUT_DIR -o=OUTPUT_DIR -s=SERVERS_STRING [-p=PORT] [-t=THREADS] [-n] [-d] [-wl] [-uf=FILTER_FILE]
        

Scripts server.py and deduplicate.py were created for easier launch on multiple machines in parallel. Programs have to be pre-distributed on all used machines and have to be in the same location e.g.: /mnt/data/bin/dedup

2.3 Hash redistribution

Hashes are divided into blocks (1999 by default - needs to be specified before processing it for the first time, can't be changed afterwards). Blocks are redistributed equally to servers before the first launch using the distribute program, that generates a config file.

The distribute program runs simultaneously with worker and server.

Usage:

 ./distribute [-s=SERVER_FILE]  [-i=INPUT_FILE] [-o=OUTPUT_FILE][-b=BLOCK_COUNT] [-d] [-q] [-f] [-E=EXC_FILE]
        

2.3.1 Blocks distribution

The blocks distribution is based on the remainder of blocks divided by number of servers.

Example for 10 servers:

 block 0, 10, 20, ... distributed to server with index value 0
 block 1, 11, 21, ... distributed to server with index value 1
 ...
 block 9, 19, 29, ... distributed to server with index value 2
        

For 100 blocks (<0,99>) each server will carry 10 blocks. This allows us to add more servers. New server would have index value 10, therefore each block each block with a remainder of 10 after dividing by 11 (current number of servers) will be distributed there.

 blocks: 10, 21, 32, 43, 54, 65, 76, 87 and 98
        

Note that every server (except for 9 or 11) lost a block.

 new distribution: servers 0..8 and 10 carry 9 blocks, server 9 carries 10 blocks (10 * 9 + 1 * 10 = 100)
        

However, this causes an issue if we add server with index value 11:

 blocks on server 11: 11, 23, 35, 47, 59, 71, 83, 95
        

This would mean that servers 1, 3, 5, 7 and 9 lose two blocks. The number of blocks would remain the same, but it would no longer be equally distributed to all the servers.

When we remove a server, it's blocks are again distributed among the remaining servers equally using the same method. This means that when we add a server again, the distribution could be unequal without optimization.

2.3.2 Optimizing blocks distribution

To prevent the issue mentioned above, we optimize the distribution.

If a server load is at least 5% higher than it should be, it's excess blocks are distributed between the servers with lower load. Optimization can be disabled using the -q --quick argument when launching. Not advised for daily use.

2.3.3 Tests and examples

In most cases, you can run the program without additional arguments, however it is advised to consider the situation when adding multiple servers.

There are multiple ways to make extensive changes (e.g. 2->10 servers).

Launch:

 ./distribute -i=dist.conf >/dev/null
        

Output:

 Servers: 2 -> 10
 Blocks to be redistributed: 1583 (79.1896%)
 Average aberrancy: 2 blocks(0.135068%)
 Max. aberrancy: 9 blocks(0.450225%)
        

Launch with paramater -f --full:

 ./distribute -f -i=dist.conf >/dev/null
        

Output:

 Servers: 2 -> 10
 Blocks to be redistributed: 1599 (79.99%)
 Average aberrancy: 0 blocks(0.0450225%)
 Max. aberrancy: 1 blocks(0.050025%)
        

In this case, it is advised to perform new distribution (without input file or using the -f --full parameter).

Output when changing the number of servers from 10 to 11 without -f --full parameter:

 Servers: 10 -> 11
 Blocks to be redistributed: 182 (9.10455%)
 Average aberrancy: 0 blocks(0.0363818%)
 Max. aberrancy: 1 blocks(0.050025%)
        

Output with parameter -f --full:

 Servers: 10 -> 11
 Blocks to be redistributed: 1809 (90.4952%)
 Average aberrancy: 0 blocks(0.0363818%)
 Max. aberrancy: 1 blocks(0.050025%)
        

Which would mean that we moved 1809 blocks, instead of 182.

If we try to add more servers now, the optimization will take effect:

 Servers: 11 -> 14
 Blocks to be redistributed: 497 (24.8624%)
 Average aberrancy: 3 blocks(0.196527%)
 Max. aberrancy: 7 blocks(0.350175%)
        

If we want to minimize the number of transferred blocks, we can disable the optimization using the -q --quick parameter:

 ./distribute -q -i=dist.conf >/dev/null
        

Output:

 Servers: 11 -> 14
 Blocks to be redistributed: 367 (18.3592%)
 Average aberrancy: 22 blocks(1.12556%)
 Max. aberrancy: -50 blocks(-2.50125%)
        

Note that both average and maximum aberrancy has increased significantly.

2.3.4 Changing server count

When a new distribution is generated, it's necessary to set all servers up. If any server is excluded (-E in program distribute), we need to specify these servers when launching. Excluded servers will launch and send hashes to their new owners according to the distribution. Server launching with new distribution can be done using the following command (server.py is used here, see Launchers):

 python3 server.py start -a -E ~/excluded.list -m ~/hashmap.conf -i /tmp/testdata/ -o /tmp/testdata -b ~/bin/server
        

Migration is launched automatically. Worker launch script will wait until the migration ends.

2.3.5 Continue in dedup after system crash

Server and worker both have a function that allows them to continue in deduplication after system crash. Logs for processed verticals that were interrupted (worker) and unsaved hashes (server) are automatically created. To take these into consideration while starting deduplication it is necessary to use argument -r --restore at worker and -j --journal at server. Finished deduplicated verticals in the output directory will be skipped. Unfinished ones, which include file *.dedup.log (removed if processing was successful), are about to be finished and their log will be removed.

Crash causes and possible solutions:

 Client fail (139 - SEG. FAULT)
  should not occur, if it does pop up, please report circumstances of crash to xcubae00@stud.fit.vutbr.cz
 - Failure of a specific client does not influence servers and other clients
 - It is possible to launch client manually 
   - "./dedup -i=INPUT_DIR -o=OUTPUT_DIR -s=list of running servers separated by spaces"
 - It is advised to repeat launch with parameter -j (server) and -r(worker) when the process is finished.
 (just turn on specific client, the rest should skip each file - should not matter)

 Server failure (139 - SEG. FAULT)
 - occured while recovering hashes after crash from journal of large proportions
 - will be investigated further - will be fixed
 - Relaunching helps

 Client termination (143 - SIGTERM / 137 - SIGKILL / 129 - SIGHUP)
 - client received signal SIGTERM/SIGKILL/SIGHUP
 - influences other clients - terminate using signal 141 (SIGPIPE)
 - Solution: relaunch - at 129 use "nohup" respectively "disown -h" (use another port if problems occur during launch)       

 Client crash (141 - SIGPIPE)
 - Client terminated because of failure of another server/client

 Problems with server launch - [FAILURE] hostname Exited with error code 1
 - issues with data are indicated in server.py 
 - deduplicate.py cyclically tests status of servers before start of a process, code 1 means that particular server is not ready.
 It can be caused by an ongoing recovery of hashes from journal, or by loading hashes from large input file.
 If number of servers ready does not change for a longer time (>1min), it is possible that some servers were not launched successfully.
 Problem can be discovered by connecting to particular server and launching "screen -r", which will switch to the server console.

 - A common problem is bind() failed: Address already in use - files from an unsuccessful run remain on a machine
 - Solution 1: hold on and try again in a bit (saving hashes may take a couple of minutes!), try to terminate servers using server.py
 - Solution 2: (be cautious as other processes are still running!) terminate all of your processes on particular server (pkill -SIGKILL -u USER)
 - Solution 3: Use another port
        

2.3.6 List of processed documents

When using parameter -dd, *.dedup.dd file with the following format will be created:

 <dd url="https:/..." title="Some title" status="X"/>

X can be:

For example, you can check document stats using:

 grep "status=\"K\"" *vert.dedup.dd
        

or:

 grep "status=\"*/*\"" *vert.dedup.dd
        

Deduplication of URL and document's names

To minimize relatively time-consuming communication between worker and server, worker attempts to exclude duplicit documents in the beginning. Name of file and URL is checked, however it cannot be said that every document with the same name/url has got the exact content. For this reason the document is tagged as a potential duplicite and the entire document is loaded into a buffer. Hash of the entire content of buffer is calculated afterwards and is sent to server for evaluation. Duplicate document will be dropped, unique will continue the standard procedure of deduplication.

Comparison of time spent

Test configuration was - 3 servers, 1× server(2 threads) a 2× worker(2×2 threads). Verticals of commoncrawl (CC-2015-40) were processed, 10 verticals 1.1 GB large together per each worker, i.e. 2.2 GB of data together.

With filter (see above) Without filter Without checking URL, titles and buffering
Duration**: 2m 54.187s 3m 14.327s 3m 10.427s
Paragraphs thrown away**: 1,328,824 1,354,775 1,354,775
Retained paragraphs: 5,048,902 5,104,177 5,104,177
Titles/URL thrown away: 4,649 (filters) 0 - dedup. within 1 CC N/A
Retained titles/URL*: 283,423 422,107 N/A








Notes:
*Retained titles/URL are usually deduplicated by paragraphs.
**Results can slightly differ in each measuring when plugging in more machines/threads, it depends on the order of hash processing. Retained paragraphs and titles/URL should remain unchanged though.

By using multiple threads the time of calculation can be reduced by more than 50 % (1m 21.932s with 6 threads, 0m 44.167s with 10 threads per worker - times are without filtering). It is appropriate to consider using multiple servers (presumed eligible ratio is approximately 1:4).

Feedback mode

It is used to check validity of deduplication. To each excluded record (.debug.dropped) information about location of duplicit hash is added. This information consists of 3 numbers. First reports what record from the input file it is. Second and third number is a reference to duplicate record. Second specifies the input file (hash and its paths) and third specifies which record from this file it is. The following format is used (added before each paragraph, except for short paragraphs): <t>POSITION_OF_DROPPED \t POSITION_OF_DUPLICIT</f>. Wikilings format uses 2 extra columns. Both POSITION_OF_DROPPED and POSITION_OF_DUPLICIT are comprised of 2 parts separated by a colon. Format: FILE_HASH:OFFSET_IN_FILE, where FILE_HASH is input file path hash and OFFSET_IN_FILE is the number of record (starting with 1). This mode can be enabled at worker launch using -f parameter. Servers don't need any additional parameters, servers automatically detect whether they should save the location of the hash in addition to the hash. The locations are not saved to output file, which means that when the server is terminated, the locations are lost.

Script checking deduplication

Script requires deduplication to be launched with arguments -f and -dr. Then .dropped files with reference to duplicates are created. Script uses these references to print a list of supposed duplicate records. The script requires a dropped file (parameter -d) and file (paramtere -f) corresponding to the deduplication logs. To use wikilinks format, launch with switch -wl. Records can be skipped using the -n switch (it is too lengthy to go through all records), so -n 10 means that only every tenth record will be checked.


3 Triggers

Launching server for deduplication

Script with argument start first launches screens and servers in screens. If argument stop is entered, script closes the screens. If neither one of start, stop, restart is entered, then script firstly tests if screens are running and then servers.

 ./processing_steps/3/server.py
        

Usage:

 ./server.py [start|stop|restart|migrate] -m MAP_FILE [-i INPUT_FILE] [-o OUTPUT_FILE [-d]] [-a] [-t THREADS] [-p PORT] [-e ERRORS_LOG_FILE] [-b BINARY] [-d] [-j] [-k] [-r SPARSEHASH_SIZE] [-P rport]
        

Note:
-j, --journal: to recover hashes from journal it is necessary to enter path to input file, which was specified as output file before a server crash. Script checks if "input_file".backup. exists. If input file does not exist on any of the servers, it will be created.

Examples:

 ./server.py start -m ~/hashmap
 # Launches screens and servers on machines specified by file ~/hashmap in them
 # Servers are waiting for workers to connect

 ./server.py -m ~/hashmap
 # Tests if the screens and servers on machines specified by file ~/hashmap are running

 ./server.py stop -m ~/hashmap -E ~/excluded.list
 # It closes screens and servers on machines specified by file~/hashmap and ~/excluded.list

 ./server.py migrate -m ~/hashmap -E ~/excluded.list -i ~/input.hash -o ~/output.hash
 # Launches hash migration according to the hash map. Servers are stopped and hashes are saved when migration ends.
        

Launching workers for deduplication

It is necessary to launch servers with the exact parameters -s and -p beforehand.

 ./processing_steps/3/deduplicate.py
        

Usage:

 ./deduplicate.py -i INPUT_DIRECTORY -o OUTPUT_DIRECTORY -w WORKERS_FILE -m MAP_FILE [-p PORT]  [-e ERRORS_LOG_FILE] [-b BINARY] [-n] [-d] [-wl]
        

Deduplication of format wikilinks is implemented so that, hash is calculated for concatenation of columns 2, 3, 5, 6, all the columns have to be the same to evaluate row as duplicated. Neardedup calculates hashes with the help of N-grams on concatenation of columns 5, 3, 6 (in this order), then it calculates hash of 2. column. The row is a duplicate if both last entries found conjunction.

Example:

 ./deduplicate.py -i /mnt/data/commoncrawl/CC-2015-18/vert/ -o /mnt/data/commoncrawl/CC-2015-18/dedup/ -w ~/workers -m ~/hashmap 
 # Data from folder /mnt/data/commoncrawl/CC-2015-18/vert/ deduplicate to folder /mnt/data/commoncrawl/CC-2015-18/dedup/
 # Deduplicaton runs on machines specified in file ~/workers
 # A launched server is expected on machines specified in file ~/hashmap
        

4 Scripts

4 Script to check the deduplication process

Located:

 /processing_steps/3/scripts/dedup_check.py
        

Example:

 dedup_check.py -s ~/servers -w ~/workers -i ~/CC-2015-35/vert/ -o ~/cc_test/output/ -sh ~/cc_test/server/
        

Output:

 ---------------- Worker knot35 ----------------
 Worker running: YES
 Local input files: 717
 Local missing files: 529 (73.77%)
 Local unfinished files: 6
 ...
 ---------------- Server athena4 ----------------
 Server running: YES
 OK
 ...
 ---------------- Summary ----------------
 Total files: 33867
 Missing files: 25185 (74.36%)
 Unfinished files: 276
 ----------------
 Total workers: 46
 Workers running: 46
 Workers finished: 0
 Workers crashed: 0
 ----------------
 Total servers: 18
 Servers running: 18
 Servers finished: 0
 Servers crashed: 0
 Total journals found: 18
        

Stop of deduplication using SIGKILL signal - it is possible to continue using switches -j (server) and -r (client).

 ---------------- Worker athena7 ----------------
 Worker running: NO
 WARNING: Worker crashed!
 Local input files: 728
 Local missing files: 533 (73.21%)
 Local unfinished files: 6
 ...
 ---------------- Server knot26 ----------------
 Server running: NO
 Warning: Server crashed! - 1 journals found
 ---------------- Server knot30 ----------------
 Server running: NO
 Server finished!
 ...
 ---------------- Summary ----------------
 Total files: 33867
 Missing files: 25092 (74.08%)
 Unfinished files: 275
 ----------------
 Total workers: 46
 Workers running: 0
 Workers finished: 0
 Workers crashed: 46
 ----------------
 Total servers: 18
 Servers running: 0
 Servers finished: 16
 Servers crashed: 2
 Total journals found: 2
        

4.2 Testing script

Use this script to test correct behavior of deduplication software:

 testing/dedup_test.py
        

Runs test on specified servers and input data:

Run example:

 python3 dedup_test.py -c
        

You can omit -c in next runs.


5 Deduplication library

Deduplication library (or libdedup) allows you to use object-oriented deduplication directly from Python.

Source files:

 git:[corpproc_dedup]:/processing_steps/3/libdedup
        

Note:
Do not use libdedup on shared minerva1 home, better use some local drive!

5.1 Software requirements

Debian based (Ubuntu) RPM based (Fedora/CentOs)
gcc, g++ gcc, g++
libglib2.0-dev >= 2.48* glib2-devel
libgirepository1.0-dev gobject-introspection-devel
libboost-system-dev, libboost-filesystem-dev boost-devel
pssh, screen, python pssh, screen, python








Note:
*KNOT servers are running on Ubuntu 14.04.5, which contains libglib2.0-dev version 2.40, version 2.48.2 is built for the purposes of libdedup automatically using autobuild.sh

5.2 Build

If you are uncertain of libglib2.0-dev version, use command:

 bash autobuild.sh
        

in directory processing_steps/3/libdedup.

If needed, glib2 2.48.2 will be compiled automatically.

New distribution versions (with libglib-2.0-dev >= 2.48) can also use:

 bash build.sh
        

in directory libdedup/src.

5.3 Usage

Short usage example can be found here:

 libdedup/test.py
        

To use libdedup directly from Python interpreter use:

 libdedup/load.sh
        

Script will set up environment variables and launch Python.

When using libdedup in own script, you have to manually set up environment variables:

 export PYTHONPATH=processing_steps/3/libdedup/
 export GI_TYPELIB_PATH=processing_steps/3/libdedup/src/
 export LD_LIBRARY_PATH=processing_steps/3/libdedup/src/
        

(Do not forget to change path for your needs)

5.3.1 Usage in Python

Import libdedup using:

 import dedup
        

Create new dedup object using:

 de = dedup.dedup(args - see below)

Start servers with:

 de.start_servers()
        

Returns 0, if successful. Servers will stay running even after Python interpreter is terminated, so it is important to shut them down correctly using:

 de.stop_servers()
        

Returns 0, if successful - if servers was running on chosen port. If you want to connect to running servers, remember to initialize dedup object with same args.

You can test, if servers are running on chosen port using:

 de.test_servers()
        

Returns 0, if running.

Create worker using:

 worker = de.create_worker()
        

Method will test servers and create worker, which will connect to them. In this moment, everything is ready and worker is waiting for verticalized article as input.

Deduplicate article using:

 vert = worker.process(DATA)
        

On output we have Vert object with attributes:

It is possible to create multiple workers, even in multiple threads. Objects Vert and Worker are destroyed using del vert or del Worker.

For deduplication of multiple vert files containing lot of articles, use classic deduplication.


6 Deduplication library for salomon

Salomon supercomputer architecture differs from KNOT servers architecture. Nodes are connected using the InfiniBand technology and OpenMPI environment is used for processing.

There is no easy way to enter OpenMPI environment at run time and connect to servers. Everything needs to be set up at launch.

We can imagine OpenMPI like closed box, where several workers, hasholders and 2 cores managing everything are located. One core reads verticals from STDIN and assigns them to workers, the other one collects deduplicated verticals and saves them to output files.

Initialization and communication with MPI environment is done using python wrapper mpidedup.py, that contains entry queue. Document is placed in queue, waiting to be collected by a writing thread and sent to OpenMPI environment.

Scripts:

 processing_steps/salomon/mpidedup/lib
        

Build

Build binary file created using the make command in folder processing_steps/salomon/mpidedup - built with salomon deduplication, and then placed in folder lib.

Salomon cores distribution to workers and hashholders can be set at launch. If a hash map is used, number of hashholders is set automatically. At least hashholder_number + 3 cores.

You can set following parameters for mpidedup object:

 $ module load Python/3.6.1
 $ module load Boost/1.59.0-intel-2016.01
 $ module load OpenMPI/1.8.8-GNU-4.9.3-2.25
 $ python3
 >>>
 import mpidedup
 de = mpidedup.mpidedup(output_dir, bin_dir="~/bin/", dropped=True, droppeddoc=True)
 de.start()
 #de.queue = input queue
 de.queue.put(vert)
 de.finish() # finalize deduplication after last document from input is processed, blocks until dedup is done
 #de.stop()  # stop deduplication now! - files on input will remain unprocessed
        

For more complex example see lib/libmpidedup_test.py and lib/run_test_salomon.sh.

You can run salomon test using:

 qsub run_test_salomon.sh
        

Distributing a document into files

To preserve the original document distribution, add source file name into document header like this:

 vert_source="filename"
 For instance: <doc vert_source="1443736676547.12_20151001215756-00001.vert"  id="..." url="..." title="...">
        

When source file name is not specified, documents will be distributed automatically.