Posts Tagged ‘eventmachine’

Simple Couch comet listener with EM

Monday, May 25th, 2009

So couch trunk has now got the long-awaited comet update-tracking functionality, obsoleting pretty much every other way of doing update notification at a stroke. I’ve been looking forward to this for a while – I want to throw an EM daemon or two on the comet URL; they’ll listen for changes and do cache invalidations/search index additions asynchronously. Yes, I could just expire the cache synchronously upon save but that gets very fiddly – I want to store the seq number in the cache so the expiration/update sequence is fully replayable. Doing that synchronously would involve another query to the DB to find the current seq, inviting race conditions – forget it. Also, I need to do message dispatch to diverse clients who might not be using the web server at all; I need all updates to flow through a router, and that can’t be in the web app.

Anyway, here’s a simple EM script which listens to the (charmingly undocumented) comet URL and does whatever you want with the updates. If you were doing anything complex you’d probably want to send the process off into a defer operation.

require 'rubygems'
require 'eventmachine'
require 'socket'
require 'json'
 
module CouchListener
  def initialize sock
    @sock = sock
  end
 
  def receive_data data
    data.each_line do |d|
      next if !d.split("\"").include?("seq")
      puts "raw: #{d.inspect}"
      begin
        json_data = JSON.parse(d)
        puts "JSON: #{json_data.inspect}"
        puts "updated: id #{json_data["id"]}, seq #{json_data["seq"]}"
      rescue Exception => e # TODO definitely do not want to rescue in production
        puts "JSON parse failed with error #{e}" 
      end
    end
  end
 
  def unbind
    EM.next_tick do
      data = @sock.read
    end
  end
end
 
CURRENT_SEQ = "0" # you'll want to replace this with whatever is current
DB_NAME = "test_comet"
 
EM.run{
  $sock = TCPSocket.new('localhost', 5984)
  $sock.write("GET /#{DB_NAME}/_changes?continuous=true&since=#{CURRENT_SEQ} HTTP/1.1\r\n\r\n")
  EM.attach $sock, CouchListener, $sock
}