#!/usr/bin/perl -w # ----------------------------------------------------------------------------- # Script created by Tim Ellis # Friendster, Inc. # Modified by Tim Ellis # Digg, Inc. # # # this script is released under the terms of the GNU General Public License # # (GPL). The copyright is held by Friendster, Inc. # # $Id: tablesync.pl,v 1.15 2005/02/18 22:44:21 time Exp $ # ----------------------------------------------------------------------------- # # ----- Script Description # this script will sync the rows between two tables: # # 1. Delete rows in tab2 that don't exist in tab1 # 2. Insert rows into tab2 from tab1 that don't exist in tab2 # 3. Update rows that are different # # this only works for tables for which the 1st column is an ID column that is # the PK for the table (or at least unique indexed) and it's assumed both # tables should have the same IDs and be otherwise the same (ie: same columns # and types) # # ----- Special Behaviours and Oddities # this picks the max(id) at the start of the run to work to. if your table is # growing during the run, this script won't deal with anything that got added # after the run started. if you have another process keeping the tables in sync # (replication?), this script won't know about it, and may delete rows or # insert rows, causing replication to break -- if you don't like this # possibility, pass the --updateonly argument to ignore insert/delete # statements and only issue updates. # # ----- MySQL-only Logic # This script plays fast and loose with column types. It should do a "desc # tablename" at the start to determine which columns need to be single-quoted # for insert/update, and which do not, but instead it deploys a dirty algorithm # and also plays on MySQL's loose typing for its columns, like the fact that # you can do "update table set dateCol = 20041015183000" when in fact that # should probably be quoted. # ----------------------------------------------------------------------------- use strict; use lib "/usr/local/lib/perl"; use Data::Dumper; use POSIX qw(strftime); use DBI; my $maxDatabaseErrors = 10; # all these variables are changed by the script later on # but defining them as =0 is useful my $debugMode = 0; my $readOnly = 0; my $verboseMode = 0; my $updateOnly = 0; my $globalLogIndent = 0; my $globalErrorCount = 0; my $syncTable = 0; my $step = 0; my $slow = 0; my $pauseX = 0; my $pauseY = 0; my $query = 0; my $beginHere = 0; my $endHere = 0; my $srcServer = 0; my $srcDatabase = 0; my $srcUser = 0; my $srcPass = 0; my $destServer = 0; my $destDatabase = 0; my $destUser = 0; my $destPass = 0; my $totalDel = 0; my $totalIns = 0; my $totalUpd = 0; my $totalSetSize1 = 0; my $totalSetSize2 = 0; my $argIn; my $noHelp = 1; while ($argIn = shift) { if ($argIn =~ /-h/ || $argIn=~/--help/) { $noHelp = 0; } elsif ($argIn =~ /^--table=(.+)$/) { $syncTable = $1; } elsif ($argIn =~ /^--dest=(.+)$/) { ($destServer, $destDatabase, $destUser, $destPass) = split (/:/, $1); } elsif ($argIn =~ /^--src=(.+)$/) { ($srcServer, $srcDatabase, $srcUser, $srcPass) = split (/:/, $1); } elsif ($argIn =~ /^--begin=(.+)$/) { $beginHere = $1; } elsif ($argIn =~ /^--end=(.+)$/) { $endHere = $1; } elsif ($argIn =~ /^--step=(.+)$/) { $step = $1; } elsif ($argIn =~ /^--slow=(.+):(.+)$/) { $slow = 1; $pauseX = $1; $pauseY = $2; } elsif ($argIn =~ /^--updateonly$/) { $updateOnly = 1; } elsif ($argIn =~ /^--verbose$/) { $verboseMode = 1; } elsif ($argIn =~ /^--debug$/) { $debugMode = 1; } elsif ($argIn =~ /^--readonly$/) { $readOnly = 1; } } unless ($noHelp && $syncTable && $step && $srcServer && $srcDatabase && $srcUser && $srcPass && $destServer && $destDatabase && $destUser && $destPass ) { print "usage: $0 --table= --src=host:database:login:password --dest=host:database:login:password --step= \\\n"; print " [--begin=] [--end=] [--slow=X:Y] [--updateonly] [--verbose] [--readonly] [--debug]\n"; print "\n"; print "Required Arguments\n"; print " table Which table you'll sync, must have identical def & an 'id' column\n"; print " src Source database to read from (hostname, databasename, loginId, password)\n"; print " dest Sync source database data to here (hostname, databasename, loginId, password)\n"; print " step How many IDs to consider at a time (smaller values are better for concurrency)\n"; print "Optional Arguments\n"; print " begin The first ID from table to start considering (must be >0)\n"; print " end The last ID from table to finish considering (must be >0)\n"; print " slow Pause after each STEP rows for \$queryTime*X+Y seconds\n"; print " updateonly Do not insert/delete anything, but only update\n"; print " verbose Turn on verbosity output\n"; print " readonly Do not attempt to update/insert/delete anything\n"; print " debug Turn on very verbose debugging output\n"; exit 0; } ### # servers to go from/to -- Testing! ### print "!!!!!!!!!!! OVERRIDING ALL INPUT PARAMETERS!!!!!!!!!!!!!!!!!!!\n"; ### my $srcServer = "localhost"; my $srcDatabase = "testFr"; ### my $srcUser = "time"; my $srcPass = "time"; ### my $destServer = "localhost"; my $destDatabase = "testGal"; ### my $destUser = "time"; my $destPass = "time"; # sort of informational log stuff at the top of a run doLog ("-- Beginning a new run -----------------------------------------------"); if ($debugMode) { doLog ("All SQL statements starting with *** will not actually be submitted to the database"); } doLog ("Will sleep ${pauseX}x+${pauseY} seconds between each range (where x=number of seconds range took)"); #connect to the DBs doLog ("Opening database handles"); my $dbh1 = DBI->connect("dbi:mysql::$srcServer", $srcUser, $srcPass, , { RaiseError => 0}); if (!defined $dbh1) { die ("$!"); } doQuery($dbh1, "set session wait_timeout=28800"); my $dbh2 = DBI->connect("dbi:mysql::$destServer", $destUser, $destPass, , { RaiseError => 0}); if (!defined $dbh2) { die ("$!"); } doQuery($dbh2, "set session wait_timeout=28800"); $dbh1->do("use $srcDatabase"); $dbh2->do("use $destDatabase"); # get total number of rows in each table doLog ("Getting maximum IDs"); my $srcMaxId = $endHere || getMaxId($dbh1); my $destMaxId = $endHere || getMaxId($dbh2); doLog ("IDs I will stop at: Source=$srcMaxId Destination=$destMaxId"); $totalSetSize1 = $srcMaxId - $beginHere; $totalSetSize2 = $destMaxId - $beginHere; doLog ("----------------------------------------------------------------------"); # get a chunk of data to work on my $syncBottom = $beginHere; my $syncTop = $syncBottom + $step - 1; # and loop through both tables doing work while ($syncBottom <= $srcMaxId || $syncBottom <= $destMaxId) { $globalLogIndent++; my $firstTiming = time(); # if they passed an --end parameter, stop there if ($syncTop > $srcMaxId) { $syncTop = $srcMaxId; } my $pctComplete1 = sprintf ("%2.2f%%", $syncTop / $srcMaxId * 100); my $pctComplete2 = sprintf ("%2.2f%%", $syncTop / $destMaxId * 100); doLog ("Working on range :: $syncBottom .. $syncTop -- pctComplete==$pctComplete1/$pctComplete2"); my $window1 = &getWindow ($dbh1, $syncBottom, $syncTop); my $window2 = &getWindow ($dbh2, $syncBottom, $syncTop); # only compare the windows and do work if they both # contain rows if (scalar @$window1 == 0 && scalar @$window2 == 0) { doLog ("Range has no results in either table, skipping to next range."); } else { $globalLogIndent++; doWindows($dbh1, $dbh2, $window1, $window2); $globalLogIndent--; } $syncBottom += $step; $syncTop = $syncBottom + $step - 1; my $secondTiming = time(); # if we're in slow mode, do some sleeping if ($slow) { # sleep a minimum of 1 seconds my $sleepInterval = ($secondTiming - $firstTiming) * $pauseX + $pauseY ; # this code represents "some actual deleting went on" if ($sleepInterval > 0) { if ($verboseMode || $debugMode) { doLog ("Sleeping $sleepInterval seconds (${pauseX}x+${pauseY} length of time to perform query)"); } sleep $sleepInterval; } } $globalLogIndent--; } doLog ("-- Done processing. Exiting. -----------------------------------------"); exit 0; # ----------------------------------------------------------- sub doWindows { my $dbh1 = shift; my $dbh2 = shift; my $window1 = shift; my $window2 = shift; my $delCount = 0; my $insCount = 0; my $updCount = 0; ### if ($debugMode) { ### print "window1\n"; ### print Data::Dumper->Dump($window1); ### print "window2\n"; ### print Data::Dumper->Dump($window2); ### } ### if ($debugMode) { ### print "ids in window1\n"; ### foreach my $row (@$window1) { ### print "debug: id == " . $row->{ id } . "\n"; ### } ### print "ids in window2\n"; ### foreach my $row (@$window2) { ### print "debug: id == " . $row->{ id } . "\n"; ### } ### } my $row1; my $row2; $row1 = shift (@$window1); $row2 = shift (@$window2); while (defined ($row1->{id}) || defined ($row2->{id})) { if ($debugMode) { if (defined ($row1->{id})) { doLog ("debug: Working on row1Id=" . $row1->{id}); } if (defined ($row2->{id})) { doLog ("debug: ...and row2Id=" . $row2->{id}); } } &doSpecialCases ($syncTable, 'always', $dbh1, $dbh2, $row1, $row2); do { if (defined ($row1) && defined ($row1->{id}) && $row1->{id} eq 'skip') { $row1 = shift (@$window1); } if (defined ($row2) && defined ($row2->{id}) && $row2->{id} eq 'skip') { $row2 = shift (@$window1); } &doSpecialCases ($syncTable, 'always', $dbh1, $dbh2, $row1, $row2); } while ((defined ($row1) && defined($row1->{id}) && $row1->{id} eq 'skip') || (defined ($row2) && defined($row2->{id}) && $row2->{id} eq 'skip')); # if the id value is lesser, insert # (undefined is the largest value) $globalLogIndent++; if (!defined $row2->{id} && !defined $row1->{id}) { #do nothing # You cannot eliminate this apparently-meaningless check, # because it might be necessary after doSpecialCases() does # its work $row1 = shift (@$window1); $row2 = shift (@$window2); } elsif (defined $row1->{id} && !defined $row2->{id}) { if ($debugMode) { doLog ("Undef id in row2: Need to Insert"); } &insertRow ($dbh2, $row1); $insCount++; $totalIns++; $row1 = shift (@$window1); $row2 = shift (@$window2); } elsif (!defined $row1->{id} && defined $row2->{id}) { if ($debugMode) { doLog ("Undef id in row1: Need to Delete"); } &deleteRow ($dbh2, $row2); $delCount++; $totalDel++; $row1 = shift (@$window1); $row2 = shift (@$window2); } elsif ($row1->{id} eq $row2->{id}) { if (&updateRow ($dbh1, $dbh2, $row1, $row2)) { $updCount++; $totalUpd++; } $row1 = shift (@$window1); $row2 = shift (@$window2); } elsif ($row1->{id} < $row2->{id}) { &insertRow ($dbh2, $row1); $insCount++; $totalIns++; $row1 = shift (@$window1); } elsif ($row1->{id} > $row2->{id}) { &deleteRow ($dbh2, $row2); $delCount++; $totalDel++; $row2 = shift (@$window2); } else { doLog ("Ummm: Honestly, I don't think this should happen."); $row1 = shift (@$window1); $row2 = shift (@$window2); } $globalLogIndent--; } # my $pctComplete1 = sprintf ("%2.2f%%", $syncTop / $srcMaxId * 100); # my $pctComplete2 = sprintf ("%2.2f%%", $syncTop / $destMaxId * 100); #if ($delCount || $insCount || $updCount) { doLog ("--- pctComplete=$pctComplete1/$pctComplete2 -- delCount=$delCount/$totalDel -- insCount=$insCount/$totalIns -- updCount=$updCount/$totalUpd"); } if ($delCount || $insCount || $updCount) { doLog ("delCount=$delCount/$totalDel -- insCount=$insCount/$totalIns -- updCount=$updCount/$totalUpd"); } } sub doSpecialCases { my $tableName = shift; my $action = shift; my $dbh1 = shift; my $dbh2 = shift; my $row1 = shift; my $row2 = shift; # do site-specific stuff here # note: used to have Friendster-specific cases, but deleted # for public consumption } sub getWindow { my $dbh = shift; my $bottom = shift; my $top = shift; my $res; my $query = "select * from $syncTable where id >= $bottom and id <= $top order by id"; do { $res = $dbh->selectall_arrayref($query, { Columns=>{} }); # check for exceptions... # if none, get the result-set if ($DBI::errstr) { doLog ("!!!ERROR :: Query==$query --- Error==$DBI::errstr"); if ($globalErrorCount++ > $maxDatabaseErrors) { die "\n. In getWindow() :: I've gotten $maxDatabaseErrors errors from the database(s). Something is wrong. Giving up"; } # give the DB some time to think about life sleep 1; } } while ($DBI::errstr); return $res; } # routines to delete, insert, or update sub deleteRow { my $dbh = shift; my $row = shift; my $action; if ($updateOnly) { $action = "updateonly mode, following query log only"; } else { $action = "deleting"; } my $query = "delete from $syncTable where id = " . $row->{id}; if ($verboseMode || $debugMode) { doLog ("$action id=" . $row->{id}); } if ($updateOnly) { doLog ($query); } else { doQuery($dbh, $query); } } sub insertRow { my $dbh = shift; my $row = shift; my $valuesToIns; my $keyColumns; my $action; if ($updateOnly) { $action = "updateonly mode, following query log only"; } else { $action = "inserting"; } foreach my $key (keys %$row) { if (defined $row->{$key}) { ### doLog ("debug: doing key=$key val=" . $row->{$key}); # this is usually pretty excessive output my $value = &sanitiseString ($row->{$key}); $valuesToIns .= "$value,"; $keyColumns .= "$key,"; } } $valuesToIns =~ s/,$//; $keyColumns =~ s/,$//; my $query = "insert into $syncTable ($keyColumns) values ($valuesToIns)"; if ($verboseMode || $debugMode) { doLog ("$action id=" . $row->{id} . " $valuesToIns"); } if ($updateOnly) { doLog ($query); } else { doQuery($dbh, $query); } } sub updateRow { my $dbh1 = shift; my $dbh2 = shift; my $row1 = shift; my $row2 = shift; my $updateClause; # convoluted logic -- all i'm really interested in is # $rowX->{$key} are differing, but the undef possibility # makes me have to add all that extra crap foreach my $key (keys %$row1) { unless (!defined $row1->{$key} && !defined $row2->{$key}) { if ( (!defined $row1->{$key} && defined $row2->{$key}) || (defined $row1->{$key} && !defined $row2->{$key}) || ($row1->{$key} ne $row2->{$key}) ) { my $value = &sanitiseString ($row1->{$key}); $updateClause .= "$key=$value,"; } } } if ($updateClause) { $updateClause =~ s/,$//; my $query = "update $syncTable set $updateClause where id = " . $row2->{id}; if ($verboseMode || $debugMode) { doLog ("Updating id=" . $row1->{id} . " $updateClause"); } doQuery($dbh2, $query); doSpecialCases($syncTable, 'update', $dbh1, $dbh2, $row1, $row2); return 1; } else { if ($debugMode) { doLog ("debug: Didn't need to update id=" . $row1->{id}); } return 0; } } # if the string is all numerics, then just return it, # otherwise, return a sanitised string with single quotes, # and escaped single quotes inside sub sanitiseString { my $str = shift; ### print "strIn=$str -- "; # pretty verbose if (defined ($str)) { # number is +/-DDD +/-0.DDD or DDD unless 0DDD (which is a string) if ($str =~ /^0+\d+$/ || ($str !~ /^[\-,\+]*\d+$/ && $str !~ /^[\-,\+]*0\.\d+$/ && $str !~ /^[\-,\+]*\d+\.\d+$/)) { $str =~ s/'/''/g; $str =~ s/\\/\\\\/g; $str = "'$str'"; } } else { $str = "NULL"; } ### print "strOut=$str\n"; # pretty verbose return $str; } # get the max IDs from both tables sub getMaxId { my $dbh = shift; my $res; my $ret; $query = "select max(id) from $syncTable"; $ret = doQuery($dbh, $query); return $ret; } # print the current date sub printDate { my $date = strftime "%Y-%m-%d %H:%M:%S ", localtime; return $date; } # mostly meant to just do a query, but if you have a single # row resultset, return that to the caller sub doQuery { my $dbh = shift; my $query = shift; # in debug mode, do not issue insert/update/delete statements if ($readOnly && ($query =~ /insert/i || $query =~ /update/i || $query =~ /delete/i || $query =~ /replace/i)) { doLog (" - Readonly: Would have executed '$query'"); } else { if ($debugMode || $verboseMode) { doLog ("debug: " . $query); } my $sth = $dbh->prepare($query); my $rv = $sth->execute(); my $rowsAffected = $sth->rows; if ($debugMode) { doLog ("Statement Handle. Return: $rv ; Rows: $rowsAffected"); } # check for exceptions... # if none, get the result-set if ($DBI::errstr) { doLog ("!!!ERROR :: Query==$query --- Error==$DBI::errstr"); if ($globalErrorCount++ > $maxDatabaseErrors) { die "\n. In doQuery() :: I've gotten $maxDatabaseErrors errors from the database(s). Something is wrong. Giving up"; } } elsif ($query =~ /select/) { my @res = $sth->fetchrow_array; my $ret = $res[0]; return $ret; } } } # output a log message sub doLog { my $msg = shift; my $i; # indent some lines for ($i = 0; $i < $globalLogIndent; $i++) { print ". "; } my $date = strftime "%Y-%m-%d %H:%M:%S", localtime; print "$date :: $msg\n"; }