65 lines
1.3 KiB
Ruby
65 lines
1.3 KiB
Ruby
require "stringio"
|
|
|
|
class Pipeline
|
|
def initialize(source)
|
|
|
|
source = { file:source } unless source.is_a? Hash
|
|
|
|
if source.has_key? :file
|
|
@op = lambda { |nxt| File.open(source[:file]) { |fp| nxt.call(fp) }}
|
|
elsif source.has_key? :string
|
|
data = source[:string]
|
|
@op = lambda { |nxt|
|
|
IO.pipe { |r, w|
|
|
w.write(data)
|
|
w.close_write
|
|
nxt.call(r)
|
|
}
|
|
}
|
|
end
|
|
end
|
|
|
|
def transform(&transformation)
|
|
old_op = @op
|
|
@op = lambda {|nxt| old_op.call(lambda {|fp|
|
|
IO.pipe {|r, w|
|
|
transformation.call(fp, w)
|
|
w.close_write
|
|
nxt.call(r)
|
|
}
|
|
})}
|
|
end
|
|
|
|
def transform_all(&transformation)
|
|
old_op = @op
|
|
@op = lambda {|nxt| old_op.call(lambda {|fp|
|
|
data = transformation.call(fp.read)
|
|
nxt.call(StringIO.new(data))
|
|
})}
|
|
end
|
|
|
|
def skip_to(n)
|
|
old_op = @op
|
|
@op = lambda {|nxt| old_op.call(lambda {|fp| fp.pos = n; nxt.call(fp) })}
|
|
end
|
|
|
|
def consume_all
|
|
ret = nil
|
|
@op.call(lambda {|fp| ret = fp.read })
|
|
ret
|
|
end
|
|
|
|
def consume(&consumer)
|
|
@op.call(lambda {|fp| consumer.call(fp) })
|
|
end
|
|
|
|
def pipe_to(out_fp)
|
|
@op.call(lambda { |fp|
|
|
until fp.eof? do
|
|
data = fp.read(nil)
|
|
out_fp.write(data) unless data.nil?
|
|
end
|
|
})
|
|
end
|
|
|
|
end |