require "stringio" class Pipeline def initialize(source) source = { file:source } unless source.is_a? Hash if source.has_key? :file @op = lambda { |nxt| File.open(source[:file]) { |fp| nxt.call(fp) }} elsif source.has_key? :string data = source[:string] @op = lambda { |nxt| IO.pipe { |r, w| w.write(data) w.close_write nxt.call(r) } } end end def transform(&transformation) old_op = @op @op = lambda {|nxt| old_op.call(lambda {|fp| IO.pipe {|r, w| transformation.call(fp, w) w.close_write nxt.call(r) } })} end def transform_all(&transformation) old_op = @op @op = lambda {|nxt| old_op.call(lambda {|fp| data = transformation.call(fp.read) nxt.call(StringIO.new(data)) })} end def skip_to(n) old_op = @op @op = lambda {|nxt| old_op.call(lambda {|fp| fp.pos = n; nxt.call(fp) })} end def consume_all ret = nil @op.call(lambda {|fp| ret = fp.read }) ret end def consume(&consumer) @op.call(lambda {|fp| consumer.call(fp) }) end def pipe_to(out_fp) @op.call(lambda { |fp| until fp.eof? do data = fp.read(nil) out_fp.write(data) unless data.nil? end }) end end