Simple Couch comet listener with EM

So couch trunk has now got the long-awaited comet update-tracking functionality, obsoleting pretty much every other way of doing update notification at a stroke. I’ve been looking forward to this for a while – I want to throw an EM daemon or two on the comet URL; they’ll listen for changes and do cache invalidations/search index additions asynchronously. Yes, I could just expire the cache synchronously upon save but that gets very fiddly – I want to store the seq number in the cache so the expiration/update sequence is fully replayable. Doing that synchronously would involve another query to the DB to find the current seq, inviting race conditions – forget it. Also, I need to do message dispatch to diverse clients who might not be using the web server at all; I need all updates to flow through a router, and that can’t be in the web app.

Anyway, here’s a simple EM script which listens to the (charmingly undocumented) comet URL and does whatever you want with the updates. If you were doing anything complex you’d probably want to send the process off into a defer operation.

require 'rubygems'
require 'eventmachine'
require 'socket'
require 'json'
 
module CouchListener
  def initialize sock
    @sock = sock
  end
 
  def receive_data data
    data.each_line do |d|
      next if !d.split("\"").include?("seq")
      puts "raw: #{d.inspect}"
      begin
        json_data = JSON.parse(d)
        puts "JSON: #{json_data.inspect}"
        puts "updated: id #{json_data["id"]}, seq #{json_data["seq"]}"
      rescue Exception => e # TODO definitely do not want to rescue in production
        puts "JSON parse failed with error #{e}" 
      end
    end
  end
 
  def unbind
    EM.next_tick do
      data = @sock.read
    end
  end
end
 
CURRENT_SEQ = "0" # you'll want to replace this with whatever is current
DB_NAME = "test_comet"
 
EM.run{
  $sock = TCPSocket.new('localhost', 5984)
  $sock.write("GET /#{DB_NAME}/_changes?continuous=true&since=#{CURRENT_SEQ} HTTP/1.1\r\n\r\n")
  EM.attach $sock, CouchListener, $sock
}

Tags: , ,

2 Responses to “Simple Couch comet listener with EM”

  1. Aman Gupta Says:
    def receive_data data
        data.each_line do |d|

    would be better written as

    def receive_data data
      @data ||= ''
      @data << data
      while d = @data.slice!(/.*?\r?\n/m)

    since you cannot assume receive_data will be called with full lines due to tcp fragmentation.

  2. Sho Says:

    Thanks Aman, good catch! You would definitely want to play it safe with the incoming data, especially if you were runnning this non-localhost.

    Still getting the hang of using sockets directly ..

Leave a Reply

You may edit your comment for up to 30 minutes after submission.