class Pipeline def initialize(file) @op = lambda { |nxt| File.open(file) { |fp| nxt.call(fp) }} end def transform(&transformation) old_op = @op @op = lambda {|nxt| old_op.call(lambda {|fp| IO.pipe {|r, w| transformation.call(fp, w) w.close_write nxt.call(r) } })} end def transform_all(&transformation) old_op = @op @op = lambda {|nxt| old_op.call(lambda {|fp| IO.pipe {|r, w| w.write(transformation.call(fp.read)) w.close_write nxt.call(r) } })} end def skip_to(n) old_op = @op @op = lambda {|nxt| old_op.call(lambda {|fp| fp.pos = n; nxt.call(fp) })} end def consume_all ret = nil @op.call(lambda {|fp| ret = fp.read }) ret end def consume(&consumer) @op.call(lambda {|fp| consumer.call(fp) }) end def pipe_to(out_fp) @op.call(lambda { |fp| until fp.eof? do data = fp.read(nil) out_fp.write(data) unless data.nil? end }) end end