module Buryspam::Bayesian

The Bayesian module is used to initialize, store and load the database of word probabilities and blacklisted/whitelisted IP address that are used during the filtering process. The time at which the initialization occurred is also stored. The 'database' is a serialize hash. An abridged example of such a hash is given below. Note that the :blacklist IP addresses are hashified to make searching for common octets faster.

{:timestamp=>Wed Oct 01 10:19:28 -0230 2008,
 :word_probs=>
  {"shiny"=>"0.29",
   ...
   "thq.com"=>"0.99",
   "pint."=>"0.99"},
 :blacklist=>
  {33=>
    {148=>{32=>{205=>1}},
     44=>{159=>{104=>2}},
     72=>{224=>{125=>1}},
     182=>{168=>{171=>1}},
     216=>{83=>{46=>1}},
     227=>{87=>{41=>1}},
     24=>{110=>{137=>1}, 196=>{81=>1}},
     114=>{151=>{225=>1}},
     175=>{160=>{38=>1}}},
   ...
   201=>
    {18=>
      {38=>{166=>2},
       126=>{16=>2},
       ...
       63=>{172=>1, 58=>3},
       169=>{13=>2}},
     ...
     92=>
      {38=>{237=>1},
       82=>{237=>1},
       ...
       208=>{41=>1},
       142=>{54=>1}}}},
 :whitelist=>
  ["10.100.21.85",
   "198.165.43.133",
   ...
   "207.171.180.180",
   "69.136.253.7"]}

Constants

Bx
Gx
ORDER

Public Class Methods

db() click to toggle source

Load and return the bayesian database. Raises a Bayesian::DatabaseError exception if there were problems.

# File buryspam.rb, line 4393
def db
  # Should really be using the 'once' trick used by date.rb stdlib.
  return @db unless @db.nil?
  Logger.debug("Loading Buryspam database.")
  # While filtering, load only the database; ignore the caches.
  @db = Hashbase.load(Config.word_file)
  if @db.nil? || @db.empty?
    raise DatabaseError,
          "Invalid or missing Bayesian database:\n" +
          "\t'#{Config.word_file}'.\n" +
          "(Initialize it with '#{$0} --init')."
  end
  raise DatabaseError, "No timestamp."          if @db[:timestamp].nil?
  raise DatabaseError, "No blacklisted IPs."    if @db[:blacklist].empty?
  raise DatabaseError, "No whitelisted IPs."    if @db[:whitelist].empty?
  raise DatabaseError, "No word probabilities." if @db[:word_probs].empty?
  @db
end
init() click to toggle source

Initialize the bayesian database. Only one initialization can be performed at a time. Displays benchmark information after initialization.

# File buryspam.rb, line 4369
def init
  begin
    Lockfile.only_one { |lockfile|
      Spam.move_missed_spam_file

      time = Benchmark.realtime {
        init_db
        Status.print("  Saving word database: ")
        Hashbase.save(Config.word_file => @db)
        Status.puts("%.2fMiB." %
                     (File.size(Config.word_file).to_f / (1024**2)))
      }
      min = time.to_i / 60
      min = min.zero? ? "" : "#{min}m"
      sec = time.to_i % 60
      Status.puts("\nInitialization time: #{min}#{sec}s")
    }
  rescue Lockfile::AlreadyLockedError
    Status.error($!.message)
  end
end
value(samples) click to toggle source

Given a hash of sample words => probabilities, return the bayesian value of the word probabilities.

# File buryspam.rb, line 4414
def value(samples)
  probs = samples.collect{ |word, prob| prob }
  p1 = probs.inject(1.0) { |prod, prob| prod * prob }
  p2 = probs.inject(1.0) { |prod, prob| prod * (1.0-prob) }
  p1 / (p1 + p2)
end

Private Class Methods

accumulate(gbi, counts) click to toggle source

Given a good/bad index and a cache counts hash parameter accumulate the word counts and ip address counts. Used exclusively by the ::update_db_counters method.

# File buryspam.rb, line 4602
def accumulate(gbi, counts)
  Mbox::COUNTERS.each { |counter|
    counts[counter].each { |item, num|
      gb_count = @db_counters[counter].fetch(item, [0, 0])
      gb_count[gbi] += num
      @db_counters[counter][item] = gb_count
    }
  }
  counts[:num_msgs]
end
analyze_ips() click to toggle source

Partition the ip addresses into good and bad addresses. Create a list of good IP address (whitelist) and create a hashified list of the octets comprising the bad IP addresses (blacklist) Returns the whitelist and blacklist.

# File buryspam.rb, line 4507
def analyze_ips()
  Status.print("Analyzing IP Addresses: ")

  s = Status.new("partitioning: ")
  ipaddrs = @db_counters[:ipaddrs]
  prog = Progress.new(ipaddrs, :partition) { |ipaddr, counts|
    counts[Gx].zero?
  }
  bad_ips, good_ips = prog.result

  s.update("good IP list: ")
  whitelist = []
  Progress.new(good_ips, :each) { |ip, counts|
    whitelist << ip
  }

  s.update("bad IP table: ")
  blacklist = {}
  Progress.new(bad_ips, :each) { |ip, counts|
    octets = ip.split(".").map{|i| i.to_i}
    last_idx = octets.size - 1
    b = blacklist
    octets.each_with_index { |oct, idx|
      b[oct] ||= (idx == last_idx ? counts[Bx] : {})
      b = b[oct]
    }
  }
  s.finish

  Status.puts("%d/%d good/bad IP addresses added." %
               [good_ips.size, bad_ips.size])
  return whitelist, blacklist
end
calculate_probs() click to toggle source

After filtering out words that are to be ignored or that are outside the word length specified in the configuration, calculate the bayesian probabilties of each remaining word and store the words and corresponding probabilities in a hash.

# File buryspam.rb, line 4545
def calculate_probs
  Status.print("    Word probabilities: ")

  probs = {}
  num_good_msgs = @db_counters[:num_msgs][Gx]
  num_bad_msgs  = @db_counters[:num_msgs][Bx]

  # Retrieve these values from Config so that we are not
  # doing a lot of redundant hash lookups in the loop below.
  ignore_words      = Config.ignore_words
  word_length       = Config.word_length
  min_word_num      = Config.min_word_num
  good_init_weight  = Config.good_init_weight
  bad_init_weight   = Config.bad_init_weight
  ignore_probs      = Config.ignore_probs
  bad_prob          = Config.bad_prob
  good_prob         = Config.good_prob
  precision         = Config.precision

  good_words = bad_words = 0

  s = Status.new("preprocessing...")
  words = @db_counters[:words].to_a
  s.update("filtering: ")
  Progress.new(words, :delete_if) { |word, counts|
    ignore_words =~ word || ! (word_length === word.length)
  }

  s.update("calculating: ")
  Progress.new(words, :each) { |word, counts|
    good, bad = counts[Gx], counts[Bx]
    g = good_init_weight * good
    b = bad_init_weight * bad

    next if g + b < min_word_num
    gf = g.to_f / num_good_msgs; gf = 1.0 if gf > 1.0
    bf = b.to_f / num_bad_msgs;  bf = 1.0 if bf > 1.0
    br = bf / (gf + bf)
    next if ignore_probs === br
    prob = br > bad_prob ? bad_prob :
           br < good_prob ? good_prob : br
    probs[word] = "%.*f" % [precision, prob]
    if prob < Message::NEUTRAL
      good_words += 1
    elsif prob > Message::NEUTRAL
      bad_words += 1
    end
  }
  s.finish
  Status.puts("%d/%d good/bad word probabilities added." %
               [good_words, bad_words])
  probs
end
init_db() click to toggle source

Low level function to setup and initialize the database hash structure.

# File buryspam.rb, line 4425
def init_db
  @db_counters = {
    :num_msgs => [0, 0],
    :ipaddrs  => {},
    :words    => {}
  }

  ORDER.each { |gb| process(gb) }

  nb = @db_counters[:num_msgs][Bx]
  ng = @db_counters[:num_msgs][Gx]
  raise "No good/bad words?" if nb + ng == 0
  Status.print("        Total messages: ")
  Status.puts("%d/%d good/bad messages processed (%d%% spam)" %
                  [ng, nb, nb / (ng + nb).to_f * 100])

  whitelist, blacklist = analyze_ips
  @db = {
    :timestamp   => Time.now,
    :blacklist   => blacklist,
    :whitelist   => whitelist,
    :word_probs  => calculate_probs
  }
end
process(gb) click to toggle source

Scan over the directories containing either the good or bad mboxes. gb is :good or :bad.

# File buryspam.rb, line 4452
def process(gb)
  Status.puts("Processing #{gb} directories:")
  # Get the bad_dirs/good_dirs/bad_init_date_range/good_init_date_range
  # configuration values dynanmically.
  directories     = Config.send(gb.to_s + '_dirs')
  @init_date_range = Config.send(gb.to_s + '_init_date_range')
  directories.each { |dir|
    unless FileUtils.dir_absolute?(dir)
      raise "Directory not absolute: '#{dir}'"
    end
    process_directory(dir, gb)
  }
  Status.puts
end
process_directory(dir, gb) click to toggle source

Scan over the files in dir containing either the good or bad mboxes. gb is :good or :bad.

# File buryspam.rb, line 4469
def process_directory(dir, gb)
  unless File.directory?(dir)
    warning = "'#{dir}' directory does not exist."
    Status.warn(" WARNING: #{warning}")
    return
  end

  Status.puts(" #{dir}:")

  no_mboxes = true
  Dir[File.join(dir, '*')].sort.each { |filename|
    next unless File.file?(filename)
    Status.print("   %20.20s : " % File.basename(filename))
    unless Mbox.is_valid?(filename)
      Status.puts("Not a valid mbox.")
      next
    end
    if Config.ignore_mboxes =~ filename
      Status.puts("Mbox ignored.")
      next
    end
    no_mboxes = false
    cache = status = nil
    time = Benchmark.realtime {
      cache = Cache.new(gb, filename, @init_date_range)
      status = update_db_counters(gb, cache)
    }
    Status.puts(status.nil? ?  "No messages in range." :
          status + " %4.1fs (%.2f ms/msg)" %
            [time.to_f, time.to_f / cache[:num_msgs] * 1e3])
  }
  Status.warn(" No mbox files found in #{dir}") if no_mboxes
end
update_db_counters(gb, cache) click to toggle source

Update the number of words, ip addresses and messages

# File buryspam.rb, line 4614
def update_db_counters(gb, cache)
  return if cache.counts.nil?

  status = Status.new("updating counters...")

  gbi = ORDER.index(gb)

  if cache[:count_type] == :total
    num_msgs = accumulate(gbi, cache.counts[:total])
  else
    num_msgs = 0
    cache.counts.each { |time, counts|
      next unless @init_date_range.cover?(time)
      num_msgs += accumulate(gbi, counts)
    }
  end

  some = all = ""
  if num_msgs == cache[:num_msgs]
    all = "(full cache)"
  else
    some = "/#{cache[:num_msgs]}"
  end

  @db_counters[:num_msgs][gbi] += num_msgs
  status.finish
  return if num_msgs.zero?

  "%5d#{some} message%s used #{all}".pluralize(num_msgs, " ")
end