# # The procs in this file are used for replication messaging # ONLY when the default mechanism of setting up a queue of # messages in a environment is not possible. This situation # is fairly rare, but it is necessary when a replication # test simultaneously runs different versions of Berkeley DB, # because different versions cannot share an env. # # Note, all procs should be named with the suffix _noenv # so it's explicit that we are using them. # # Close up a replication group - close all message dbs. proc replclose_noenv { queuedir } { global queuedbs machids set dbs [array names queuedbs] foreach tofrom $dbs { set handle $queuedbs($tofrom) error_check_good db_close [$handle close] 0 unset queuedbs($tofrom) } set machids {} } # Create a replication group for testing. proc replsetup_noenv { queuedir } { global queuedbs machids file mkdir $queuedir # If there are any leftover handles, get rid of them. set dbs [array names queuedbs] foreach tofrom $dbs { unset queuedbs($tofrom) } set machids {} } # Send function for replication. proc replsend_noenv { control rec fromid toid flags lsn } { global is_repchild global queuedbs machids global drop drop_msg global perm_sent_list global anywhere global qtestdir testdir if { ![info exists qtestdir] } { set qtestdir $testdir } set queuedir $qtestdir/MSGQUEUEDIR set permflags [lsearch $flags "perm"] if { [llength $perm_sent_list] != 0 && $permflags != -1 } { # puts "replsend_noenv sent perm message, LSN $lsn" lappend perm_sent_list $lsn } # # If we are testing with dropped messages, then we drop every # $drop_msg time. If we do that just return 0 and don't do # anything. # if { $drop != 0 } { incr drop if { $drop == $drop_msg } { set drop 1 return 0 } } # XXX # -1 is DB_BROADCAST_EID if { $toid == -1 } { set machlist $machids } else { set m NULL # If we can send this anywhere, send it to the first id # we find that is neither toid or fromid. If we don't # find any other candidates, this falls back to the # original toid. if { $anywhere != 0 } { set anyflags [lsearch $flags "any"] if { $anyflags != -1 } { foreach m $machids { if { $m == $fromid || $m == $toid } { continue } set machlist [list $m] break } } } # # If we didn't find a different site, fall back # to the toid. # if { $m == "NULL" } { set machlist [list $toid] } } foreach m $machlist { # Do not broadcast to self. if { $m == $fromid } { continue } # Find the handle for the right message file. set pid [pid] set db $queuedbs($m.$fromid.$pid) set stat [catch {$db put -append [list $control $rec $fromid]} ret] } if { $is_repchild } { replready_noenv $fromid from } return 0 } # proc replmsglen_noenv { machid {tf "to"}} { global queuedbs qtestdir testdir if { ![info exists qtestdir] } { set qtestdir $testdir } set queuedir $qtestdir/MSGQUEUEDIR set orig [pwd] cd $queuedir if { $tf == "to" } { set msgdbs [glob -nocomplain ready.$machid.*] } else { set msgdbs [glob -nocomplain ready.*.$machid.*] } cd $orig return [llength $msgdbs] } # Discard all the pending messages for a particular site. proc replclear_noenv { machid {tf "to"}} { global queuedbs qtestdir testdir if { ![info exists qtestdir] } { set qtestdir $testdir } set queuedir $qtestdir/MSGQUEUEDIR set orig [pwd] cd $queuedir if { $tf == "to" } { set msgdbs [glob -nocomplain ready.$machid.*] } else { set msgdbs [glob -nocomplain ready.*.$machid.*] } foreach m $msgdbs { file delete -force $m } cd $orig set dbs [array names queuedbs] foreach tofrom $dbs { # Process only messages _to_ the specified machid. if { [string match $machid.* $tofrom] == 1 } { set db $queuedbs($tofrom) set dbc [$db cursor] for { set dbt [$dbc get -first] } \ { [llength $dbt] > 0 } \ { set dbt [$dbc get -next] } { error_check_good \ replclear($machid)_del [$dbc del] 0 } error_check_good replclear($db)_dbc_close [$dbc close] 0 } } cd $queuedir if { $tf == "to" } { set msgdbs [glob -nocomplain temp.$machid.*] } else { set msgdbs [glob -nocomplain temp.*.$machid.*] } foreach m $msgdbs { # file delete -force $m } cd $orig } # Makes messages available to replprocessqueue by closing and # renaming the message files. We ready the files for one machine # ID at a time -- just those "to" or "from" the machine we want to # process, depending on 'tf'. proc replready_noenv { machid tf } { global queuedbs machids global counter global qtestdir testdir if { ![info exists qtestdir] } { set qtestdir $testdir } set queuedir $qtestdir/MSGQUEUEDIR set pid [pid] # # Close the temporary message files for the specified machine. # Only close it if there are messages available. # set dbs [array names queuedbs] set closed {} foreach tofrom $dbs { set toidx [string first . $tofrom] set toid [string replace $tofrom $toidx end] set fidx [expr $toidx + 1] set fromidx [string first . $tofrom $fidx] # # First chop off the end, then chop off the toid # in the beginning. # set fromid [string replace $tofrom $fromidx end] set fromid [string replace $fromid 0 $toidx] if { ($tf == "to" && $machid == $toid) || \ ($tf == "from" && $machid == $fromid) } { set nkeys [stat_field $queuedbs($tofrom) \ stat "Number of keys"] if { $nkeys != 0 } { lappend closed \ [list $toid $fromid temp.$tofrom] error_check_good temp_close \ [$queuedbs($tofrom) close] 0 } } } # Rename the message files. set cwd [pwd] foreach filename $closed { set toid [lindex $filename 0] set fromid [lindex $filename 1] set fname [lindex $filename 2] set tofrom [string replace $fname 0 4] incr counter($machid) cd $queuedir # puts "$queuedir: Msg ready $fname to ready.$tofrom.$counter($machid)" file rename -force $fname ready.$tofrom.$counter($machid) cd $cwd replsetuptempfile_noenv $toid $fromid $queuedir } } # Add a machine to a replication environment. This checks # that we have not already established that machine id, and # adds the machid to the list of ids. proc repladd_noenv { machid } { global queuedbs machids counter qtestdir testdir if { ![info exists qtestdir] } { set qtestdir $testdir } set queuedir $qtestdir/MSGQUEUEDIR if { [info exists machids] } { if { [lsearch -exact $machids $machid] >= 0 } { error "FAIL: repladd_noenv: machid $machid already exists." } } set counter($machid) 0 lappend machids $machid # Create all the databases that receive messages sent _to_ # the new machid. replcreatetofiles_noenv $machid $queuedir # Create all the databases that receive messages sent _from_ # the new machid. replcreatefromfiles_noenv $machid $queuedir } # Creates all the databases that a machid needs for receiving messages # from other participants in a replication group. Used when first # establishing the temp files, but also used whenever replready_noenv moves # the temp files away, because we'll need new files for any future messages. proc replcreatetofiles_noenv { toid queuedir } { global machids foreach m $machids { # We don't need a file for a machid to send itself messages. if { $m == $toid } { continue } replsetuptempfile_noenv $toid $m $queuedir } } # Creates all the databases that a machid needs for sending messages # to other participants in a replication group. Used when first # establishing the temp files only. Replready moves files based on # recipient, so we recreate files based on the recipient, also. proc replcreatefromfiles_noenv { fromid queuedir } { global machids foreach m $machids { # We don't need a file for a machid to send itself messages. if { $m == $fromid } { continue } replsetuptempfile_noenv $m $fromid $queuedir } } proc replsetuptempfile_noenv { to from queuedir } { global queuedbs set pid [pid] # puts "Open new temp.$to.$from.$pid" set queuedbs($to.$from.$pid) [berkdb_open -create -excl -recno\ -renumber $queuedir/temp.$to.$from.$pid] error_check_good open_queuedbs [is_valid_db $queuedbs($to.$from.$pid)] TRUE } # Process a queue of messages, skipping every "skip_interval" entry. # We traverse the entire queue, but since we skip some messages, we # may end up leaving things in the queue, which should get picked up # on a later run. proc replprocessqueue_noenv { dbenv machid { skip_interval 0 } { hold_electp NONE } \ { dupmasterp NONE } { errp NONE } } { global errorCode global perm_response_list global qtestdir testdir # hold_electp is a call-by-reference variable which lets our caller # know we need to hold an election. if { [string compare $hold_electp NONE] != 0 } { upvar $hold_electp hold_elect } set hold_elect 0 # dupmasterp is a call-by-reference variable which lets our caller # know we have a duplicate master. if { [string compare $dupmasterp NONE] != 0 } { upvar $dupmasterp dupmaster } set dupmaster 0 # errp is a call-by-reference variable which lets our caller # know we have gotten an error (that they expect). if { [string compare $errp NONE] != 0 } { upvar $errp errorp } set errorp 0 set nproced 0 set queuedir $qtestdir/MSGQUEUEDIR # puts "replprocessqueue_noenv: Make ready messages to eid $machid" # Change directories temporarily so we get just the msg file name. set cwd [pwd] cd $queuedir set msgdbs [glob -nocomplain ready.$machid.*] # puts "$queuedir.$machid: My messages: $msgdbs" cd $cwd foreach msgdb $msgdbs { set db [berkdb_open $queuedir/$msgdb] set dbc [$db cursor] error_check_good process_dbc($machid) \ [is_valid_cursor $dbc $db] TRUE for { set dbt [$dbc get -first] } \ { [llength $dbt] != 0 } \ { set dbt [$dbc get -next] } { set data [lindex [lindex $dbt 0] 1] set recno [lindex [lindex $dbt 0] 0] # If skip_interval is nonzero, we want to process # messages out of order. We do this in a simple but # slimy way -- continue walking with the cursor # without processing the message or deleting it from # the queue, but do increment "nproced". The way # this proc is normally used, the precise value of # nproced doesn't matter--we just don't assume the # queues are empty if it's nonzero. Thus, if we # contrive to make sure it's nonzero, we'll always # come back to records we've skipped on a later call # to replprocessqueue. (If there really are no records, # we'll never get here.) # # Skip every skip_interval'th record (and use a # remainder other than zero so that we're guaranteed # to really process at least one record on every call). if { $skip_interval != 0 } { if { $nproced % $skip_interval == 1 } { incr nproced set dbt [$dbc get -next] continue } } # We need to remove the current message from the # queue, because we're about to end the transaction # and someone else processing messages might come in # and reprocess this message which would be bad. # error_check_good queue_remove [$dbc del] 0 # We have to play an ugly cursor game here: we # currently hold a lock on the page of messages, but # rep_process_message might need to lock the page with # a different cursor in order to send a response. So # save the next recno, close the cursor, and then # reopen and reset the cursor. If someone else is # processing this queue, our entry might have gone # away, and we need to be able to handle that. # # error_check_good dbc_process_close [$dbc close] 0 set ret [catch {$dbenv rep_process_message \ [lindex $data 2] [lindex $data 0] \ [lindex $data 1]} res] # Save all ISPERM and NOTPERM responses so we can # compare their LSNs to the LSN in the log. The # variable perm_response_list holds the entire # response so we can extract responses and LSNs as # needed. # if { [llength $perm_response_list] != 0 && \ ([is_substr $res ISPERM] || [is_substr $res NOTPERM]) } { lappend perm_response_list $res } if { $ret != 0 } { if { [string compare $errp NONE] != 0 } { set errorp "$dbenv $machid $res" } else { error "FAIL:[timestamp]\ rep_process_message returned $res" } } incr nproced if { $ret == 0 } { set rettype [lindex $res 0] set retval [lindex $res 1] # # Do nothing for 0 and NEWSITE # if { [is_substr $rettype HOLDELECTION] } { set hold_elect 1 } if { [is_substr $rettype DUPMASTER] } { set dupmaster "1 $dbenv $machid" } if { [is_substr $rettype NOTPERM] || \ [is_substr $rettype ISPERM] } { set lsnfile [lindex $retval 0] set lsnoff [lindex $retval 1] } } if { $errorp != 0 } { # Break on an error, caller wants to handle it. break } if { $hold_elect == 1 } { # Break on a HOLDELECTION, for the same reason. break } if { $dupmaster == 1 } { # Break on a DUPMASTER, for the same reason. break } } error_check_good dbc_close [$dbc close] 0 # # Check the number of keys remaining because we only # want to rename to done, message file that are # fully processed. Some message types might break # out of the loop early and we want to process # the remaining messages the next time through. # set nkeys [stat_field $db stat "Number of keys"] error_check_good db_close [$db close] 0 if { $nkeys == 0 } { set dbname [string replace $msgdb 0 5 done.] # # We have to do a special dance to get rid of the # empty messaging files because of the way Windows # handles open files marked for deletion. # On Windows, a file is marked for deletion but # does not actually get deleted until the last handle # is closed. This causes a problem when a test tries # to create a new file with a previously-used name, # and Windows believes the old file still exists. # Therefore, we rename the files before deleting them, # to guarantee they are out of the way. # file rename -force $queuedir/$msgdb $queuedir/$dbname file delete -force $queuedir/$dbname } } # Return the number of messages processed. return $nproced }