BioJulia系列-Automa
正则表达式理论基础
正则表达式的两个原子属性:
re""
: 空字符串是合法的正则表达式re"p"
: 有限字母表中的单个符号匹配
原子操作可以与以下操作组合:
|
: 交替,R | P
表示匹配R或P*
: 串联,R * P
表示先匹配R再匹配PR*
: 重复, 匹配0次或多次R
其他正则表达式匹配操作可以从原子操作组合而成:
R?
可以视为""|R
;R+
可以视为RR*
;
基于以上理论构建的正则表达式,其算法复杂度为
O(N)
时间和O(1)
空间, 所以是非常快的正则表达式的以上实现, 被转换成
有限自动机
, 然后在代码中实现
有限自动机
分为非确定性自动机(non-deterministic finite automata, NFA)和确定性自动机(deterministic finite automata, DFA)。
非确定性自动机
具有可以从节点 A 移动到节点 B 的
ɛ
边,而无需符号成本;可以从一个状态进行多次转换;
可以有多个最终状态;
确定性自动机
没有
ɛ
边;只能有一个状态的转换;
只能有一个最终状态;
更容易在代码中实现, 速度更快;
可以(通过折叠节点)优化;
NFA转换成DFA
将具有N
个节点的NFA转换为DFA可能会产生\(2^N\)个节点(最坏的情况),主要有两种方法可以解决此问题:
Powerset construction
:Automa.jl
采用的方法, 直接构造DFA, 并接受最坏情况。\(Ø(2^N)\)的最坏情况通常很少发生, 而且automa的构造是发生在编译时, 速度影响不大;Adaptive construction
:ripgrep
等命令行工具常采用的方法(命令行工具的DFA构造发生在运行时, 所以必须要用一种动态高效的策略), 动态构造DFA并进行缓存, 如果DFA变得太大, 就刷新缓存, 如果刷新的太频繁, 就退回重新构建NFA;
Automa基本原理
Automa
创建Julia Expr
对象来模拟DFA, 然后用元编程生成Julia函数, 然后让Julia和LLVM编译器优化函数;可以将Julia代码放入DFA模拟中, 目前仅支持两种代码:
actions
和preconditions
;actions
: 在状态转换时执行的代码;preconditions
: 在状态转换之前执行的代码, 返回值必须是Bool
;
Regex
例子:
文本验证器
缓冲区验证器
Automa提供一个generate_buffer_validator
函数, 用于生成一个缓冲区验证器.
IO验证器
对大文件, 通常不能全部读入Buffer, 此时可以使用generate_io_validator
生成一个IO验证器.
IO验证器如果错误, 返回(byte, (line, column))
, 其中byte
是第一个错误字节,(line,column)
是字节的位置。如果错误字节是换行符,则列为0。如果输入到达意外的EOF,则字节为 nothing
, 并且(line,column)
指向 IO 中的最后一行/最后一列
相关API
分词器
用于将输入文本分解为小块
分词器本身是上下文未知的
制作分词器
Automa.jl
中制作分词器: make_tokenizer
:
julia> make_tokenizer(
[
re"\(",
re"\)",
re",",
re"\"",
re" +",
re"[a-zA-Z]+"
]
) |> eval
默认的分词器用
UInt32
当作tokens解析格式, 可以提通过调用tokenize(UInt32, data)
来解析, 详细用法见下方API介绍。
UInt32
当作tokens不太方便, 可以用enums
来创建: julia> @enum Token error lparens rparens comma quot space letters
julia> make_tokenizer((error, [
lparens => re"\(",
rparens => re"\)",
comma => re",",
quot => re"\"",
space => re" +",
letters => re"[a-zA-Z]+"
])) |> eval
julia> collect(tokenize(Token, "XY!!)"))
3-element Vector{Tuple{Int64, Int32, Token}}:
(1, 2, letters)
(3, 2, error)
(5, 1, rparens)
更简便地, 可以一次性定义enum并构建tokenizer:
tokens = [
:lparens => re"\(",
:rparens => re"\)",
:comma => re",",
:quot => re"\"",
:space => re" +",
:letters => re"[a-zA-Z]+"
]
@eval @enum Token error $(first.(tokens)...)
make_tokenizer((error,
[Token(i) => j for (i,j) in enumerate(last.(tokens))]
)) |> eval
julia> make_tokenizer([re"[ab]+", re"ab*", re"ab"]) |> eval
"a", "aa", "ab"
等字符串时, 多个正则表达式都可以匹配, 具体对应哪个token, 根据如下两个规则判断:
匹配最长的token: 比如用上述token解析
"aa"
时,re"[ab]+"
会解析成一个长度为2byte的token, 而re"ab*"
会解析成两个长度为1byte的token, 因此"aa"
的输出token是re"[ab]+"
;输出索引更高的token: 如果两个token长度一样, 输出索引更高的那个。比如匹配
"ab"
时, 三个正则都可以匹配, 且长度都是1byte, 此时输出re"ab"
, 因为它的index最大;
在make_tokenizer
时添加unambiguous=true
参数, 可以消除歧义, 即当出现有歧义的token时, 就报错。这会导致大多数分词器构建报错, 因为大多数分词器都是有歧义的。
Tokenizer{E, Data, C::Int}
相关API
Tokenizer{E, Data, C::Int}
生成针对数据
Data
的E
类型Token迭代器;迭代器返回一个三个元素的Tuple:
Tuple{Int, Int, Token}
第一个是buffer中token的起始位置(1-based);
第二个是token长度;
第三个是token种类(token的索引)
C::Int
: 标识数字, 用于从同一套参数创建多个分词器
tokenize(::Type{E}, data, version=1)
Tokenizer{E, typeof(data), version}
make_tokenizer(
machine::TokenizerMachine;
tokens::Tuple{E, AbstractVector{E}} = [ integers ],
goto=true, version=1
) where E
代码生成器, 定义
Base.iterate(::Tokenizer{E, D, $version})
;tokens
: 一个元组, 第一个参数是error token, 第二个参数是所有non-error token组成的向量;version
: 设置Tokenizer
类型的最后一个参数;
例子:
julia> machine = compile([re"a", re"b"]);
julia> make_tokenizer(machine; tokens=(0x00, [0x01, 0x02])) |> eval
julia> iter = tokenize(UInt8, "abxxxba"); typeof(iter)
Tokenizer{UInt8, String, 1}
julia> collect(iter)
5-element Vector{Tuple{Int64, Int32, UInt8}}:
(1, 1, 0x01)
(2, 1, 0x02)
(3, 3, 0x00)
(6, 1, 0x02)
(7, 1, 0x01)
make_tokenizer(
tokens::Union{
AbstractVector{RE},
Tuple{E, AbstractVector{Pair{E, RE}}}
};
goto::Bool=true,
version::Int=1,
unambiguous=false
) where E
一个方便的多重派发, 输入tokens
, 直接编译成machine
, 然后再生成Tokenizer
;
如果
tokens
是一个RE
向量, 会自动创建一个error token
(index为0), 然后创建index为输入向量下标的non-error token
; 否则,tokens
必须是一个元组, 第一个元素是error token
, 第二个元素是pair
类型的non-error token
(:name => re"pattern"
)数组;
例子:
julia> make_tokenizer([re"abc", re"def") |> eval
julia> collect(tokenize(Int, "abcxyzdef123"))
4-element Vector{Tuple{Int64, Int32, UInt32}}:
(1, 3, 0x00000001)
(4, 3, 0x00000003)
(7, 3, 0x00000002)
(10, 3, 0x00000003)
从Buffer解析
当前的
Automa
通过指针加载数据, 因此需要数据类型是String
或支持转换成Array{UInt8}
, 不支持UnitRange{UInt8}
等类型;尽量不要把strided view传递给Automa (我甚至都不知道atrided view是啥, 貌似应该翻译成跨度视图, 存储了跨度信息, 允许指针以各种步幅在内存中移动。Automa始终将指针一次前进一个字节, 不考虑步幅);
利用元编程来组合正则表达式和Julia代码, 从而构建解析器;
例子 FASTA格式解析器的构建
struct Seq
name::String
seq::String
end
action
Automa目前支持在RE中的以下位置添加
action
:onenter!
: 在读取RE的第一个字节时执行;onfinal!
: 在读取RE的最后一个字节时执行;onexit!
: 在读取RE结束后的第一个字节时执行, 或者在遇到输入末尾退出RE时执行(意外退出时不适用);onall!
: 在读取RE的每个字节时都执行;
action
可以设置名称(或名称列表), 用Symbol
表示:
my_regex = re"ABC";
onenter!(my_regex, [:action_a, :action_b]);
onexit!(my_regex, :action_c);
onXXX!
函数返回修改的正则表达式, 因此可以嵌套:my_re = onexit!(onenter!(re"ABC", [:action_a, :action_b]), :action_c); my_regex isa RE
Machine
Machine
类型表示优化后的DFA, 可以用complie
函数生成(RE->NFA->DFA->Machine);RE在Automa中最终都要编译成
Machine
的, 所以惯用语法是把RE的定义用let
语句包含在machine的编译过程:
machine = let
header = re"[a-z]+"
seqline = re"[ACGT]+"
record = re">" * header * '\n' * rep1(seqline * '\n')
compile(rep(record)) # 逐步组合RE, 生成最终要编译成machine的RE
end
@assert machine isa Automa.Machine
4.1 得到Machine以后, 我们需要定义如何根据匹配规则和action标签生成想要的action代码, Machine中有一些保留变量:
byte
: 当前输入的字节,UInt8
格式;p
:byte
在buffer中的索引位置(1-based);p_end
: 输入buffer的长度;is_eof
: machine是否到达输入终点;cs
: machine的当前状态,Int
格式;data
: 输入buffermem
: 正在读的内存信息, 是一个Automa.SizedMemory
对象, 包含指针和长度(一般情况应该用不到这个保留变量)
最终编写的action代码, 要放到一个字典中: Dict{Symbol, Expr}
:
actions = Dict(
:mark_pos => :(pos = p),
:header => :(header = String(data[pos:p-1])),
:seqline => :(append!(buffer, data[pos:p-1])),
:record => :(push!(seqs, Seq(header, String(buffer))))
);
如果表达式有多行, 可以用
quote...end
代码块来包裹
4.2 构造解析数据的函数
@eval function parse_fasta(data)
pos = 0
buffer = UInt8[]
seqs = Seq[] # 初始化seqs
header = "" # 初始化header
$(generate_code(machine, actions)) # action dict生成相关代码
return seqs
end
# 现在可以使用parse_fasta函数了:
parse_fasta(">abc\nTAGA\nAAGA\n>header\nAAAG\nGGCG\n")
\n
换行符, 即>a\nATCG\n
是合法的一条记录, 但>a\nATCG
不是。
可以容易地更改正则表达式以取消该限制:
machine = let
header = onexit!(onenter!(re"[a-z]+", :mark_pos), :header)
seqline = onexit!(onenter!(re"[ACGT]+", :mark_pos), :seqline)
record = onexit!(re">" * header * '\n' * seqline * rep('\n' * seqline), :record)
compile(opt(record) * rep('\n' * record) * rep(re"\n"))
end;
ERROR: Ambiguous NFA.
>a\nATCG\n
, 在遇到第二个\n
后, 解释器不知道如何操作了: 如果下一个字节是A
, 则应该执行:seqline
操作; 如果下一个字节是>
, 则应该先执行:seqline
再执行:record
, 而Automa是无法预先知道下一个字节是啥的, 所以会拒绝编译。
有几种方法可以解决这个问题:
重写正则表达式, 避免歧义: 这通常是首选解决方案;
在complie时设置
unambiquous=false
: 这通常不是好的解决方案, 因为会引入未定义的行为;重写action dict
使用前置条件(preconditions)
precondition
是一个Symbol
, 附加到Expr
对象, 计算结果必须是Bool
型, 例如:
一个有歧义的machine:
machine = let
a = onenter!(re"XY", :a)
b = onenter1(re"XZ", :b)
compile('A' * (a | b))
end;
precond!
来消除歧义: machine = let
a = precond!(onenter!(re"XY", :a), :test)
b = precond!(onenter!(re"XZ", :b), :test; bool=false) # <==
compile('A' * (a | b))
end
# 上述代码中, precond!(regex, label)等效于precond!(regex, label; when=:enter, bool=true)
# 即仅当`:test`为`false`时, 再解析`re"XZ"`
相关API
onenter!(re::RE, a::Union{Symbol, Vector{Symbol}}) -> re
设置读取RE的第一个字节时执行的action
如果传递了多个action, 则顺序执行
例子:
regex = re"ab?c*"
regex2 = onenter!(regex, :entering_regex)
regex === regex2 # true
onexit!
, onall!
, onfinal!
与之类似, 不再赘述。
precond!(re::RE, s::Symbol; [when=:enter], [bool=true]) -> re
设置
s
为re
的前置条件: 当s
为bool
时, 才进行re
的状态转换;when
: 可选项:enter
和:all
, 控制检查是只在进入re时执行还是在re的每个状态转换时都执行;
例子:
regex = re"ab?c"
regex2 = precond!(regex, :some_condition);
regex === regex2 # true
generate_code([::CodeGenContext], machine::Machine, actions=nothing)::Expr
为machine生成初始化和执行代码;
实际上是以下代码生成函数的简写:
generate_init_code(ctx, machine)
generate_action_code(ctx, machine, actions)
generate_input_error_code(ctx, machine)
(如果actions
设为:debug
则跳过该步骤)
自定义代码生成
Automa
生成的精确代码不是稳定的, 可能会在没有警告的情况下进行更改, 只有DFA模拟
是稳定的;以一个例子看一下生成的代码:
# Initialize variables used in the code below (变量初始化)
byte::UInt8 = 0x00
p::Int = 1
p_end::Int = sizeof(data)
p_eof::Int = p_end
cs::Int = 1
# Turn the input buffer into SizedMemory, to load data from pointer
GC.@preserve data begin
mem::Automa.SizedMemory = (Automa.SizedMemory)(data)
# For every input byte:
while p ≤ p_end && cs > 0
# Load byte
byte = mem[p]
# Load the action, to execute, if any, by looking up in a table
# using the current state (cs) and byte
@inbounds var"##292" = Int((Int8[0 0 ... 0 0; 0 0 ... 0 0; ... ; 0 0 ... 0 0; 0 0 ... 0 0])[(cs - 1) << 8 + byte + 1])
# Look up next state. If invalid input, next state is negative current state
@inbounds cs = Int((Int8[-1 -2 ... -5 -6; -1 -2 ... -5 -6; ... ; -1 -2 ... -5 -6; -1 -2 ... -5 -6])[(cs - 1) << 8 + byte + 1])
# Check each possible action looked up above, and execute it
# if it is not zero
if var"##292" == 1
pos = p
elseif var"##292" == 2
header = String(data[pos:p - 1])
elseif if var"##292" == 3
append!(buffer, data[pos:p - 1])
elseif var"##292" == 4
seq = Seq(header, String(buffer))
push!(seqs, seq)
end
# Increment position by 1
p += 1
# If we're at end of input, and the current state in in an accept state:
if p > p_eof ≥ 0 && cs > 0 && (cs < 65) & isodd(0x0000000000000021 >>> ((cs - 1) & 63))
# What follows is a list of all possible EOF actions.
# If state is state 6, execute the appropriate action
# tied to reaching end of input at this state
if cs == 6
seq = Seq(header, String(buffer))
push!(seqs, seq)
cs = 0
# Else, if the state is < 0, we have taken a bad input (see where cs was updated)
# move position back by one to leave it stuck where it found bad input
elseif cs < 0
p -= 1
end
# If cs is not 0, the machine is in an error state.
# Gather some information about machine state, then throw an error
if cs != 0
cs = -(abs(cs))
var"##291" = if p_eof > -1 && p > p_eof
nothing
else
byte
end
Automa.throw_input_error($machine, -cs, var"##291", mem, p)
end
end
end # GC.@preserve
上述代码是顺序结构的
表代码生成器
结果, 如果设置goto=true
, 会生成goto代码:更难阅读;
代码更大;
不使用边界检查;
不允许自定义getbyte;
比表代码快得多
CodeGenContext类型
CodeGenContext(;
vars=Variables(:p, :p_end, :is_eof, :cs, :data, :mem, :byte, :buffer),
generator=:table,
getbyte=Base.getindex,
clean=false
)
CodeGenContext
类型(简称ctx)用于存储自第一代码的设置项, 如果没有显式声明, 则使用默认的ctx.
自定义变量名称:
ctx包含一个
Variables
对象, 记录生成代码中使用的变量名集合, 该对象中存储的变量名可以用.vars
调用, 如ctx.p; ctx.p_end; ctx.byte
;可以通过更改
Variables
对象中的变量名自定义生成代码的变量名, 如把byte
改成u8
:
ctx = CodeGenContext(vars=Automa.Variables(byte=:u8))
code = generate_code(ctx, machine, actions)
clean
: 设置为true则会移除大多数行号信息;getbyte
: 从主循环中获取byte的函数, 默认是Base.getindex
, 可以是任意函数, 调用方式是getbyte_fun(data, p)
性能优化
前边的FASTA解析例子, 原始代码速度为300 MB/s
, 现在我们对其进行进一步优化。
原始代码为:
struct Seq
name::String
seq::String
end
machine = let
header = onexit!(onenter!(re"[a-z]+", :mark_pos), :header)
seqline = onexit!(onenter!(re"[ACGT]+", :mark_pos), :seqline)
record = onexit!(re">" * header * '\n' * rep1(seqline * '\n'), :record)
compile(rep(record))
end
@assert machine isa Automa.Machine
actions = Dict(
:mark_pos => :(pos = p),
:header => :(header = String(data[pos:p-1])),
:seqline => :(append!(buffer, data[pos:p-1])),
:record => :(push!(seqs, Seq(header, String(buffer))))
);
@eval function parse_fasta(data)
pos = 0
buffer = UInt8[]
seqs = Seq[] # 初始化seqs
header = "" # 初始化header
$(generate_code(machine, actions)) # action dict生成相关代码
return seqs
end
parse_fasta(">abc\nTAGA\nAAGA\n>header\nAAAG\nGGCG\n")
seq类型
,而是记录输入数据对应的的索引位置struct SeqPos
offset::Int
hlen::Int32
slen::Int32
end
这样做的初衷是尽可能地避免allocations
, 因此, 对应的actions
也要改:
actions = Dict(
:mark_pos => :(pos = p),
:header => :(hlen = p - pos),
:seqline => :(slen += p - pos),
:record => quote
seqpos = SeqPos(offset, hlen, slen)
nseq += 1
seqs[nseqs] = seqpos
offset += hlen + slen
slen = 0
end
);
@assert actions isa Dict
对应的解析函数也要改:
@eval function parse_fasta(data)
pos = slen = hlen = offset = nseqs = 0
seqs = Vector{SeqPos}(undef, 400000)
$(generate_code(machine, actions))
return seqs
end
作者测试, 这样的操作, 速度提升到450MB/s;
进一步采用goto
方式的代码生成器:$(generate_code(CodeGenContext(generator=:goto)), machine, actions))
, 解析速度提升到4GB/s(强啊!)
API
CodeGenContext(;
vars=Variables(:p, :p_end, :is_eof, :cs, :data, :mem, :byte, :buffer),
generator=:table,
getbyte=Base.getindex,
clean=false
)
vars
: 变量名, 参阅Variables
结构;generator
::table
或:goto
;getbyte
: byte index获取函数f(data, p)
;clean
: 是否清除QuoteNode
(行信息);
p::Int
: current position of datap_end::Int
: end position of datais_eof::Bool
: Whether p_end marks end file streamcs::Int
: current statedata::Any
: input datamem::SizedMemory
: Memory wrapping databyte::UInt8
: current byte being read from databuffer::TranscodingStreams.Buffer
: (generate_reader only)
从IO解析
生信从多数数据文件大小动辄数十GB, 从buffer中加载这些文件是不现实的;
Automa
通过与TranscodingStreams.jl联动, 生成stream buffer从而实现IO解析;好处是可以充分利用
TranscodingStreams.jl
的解码标准, 支持各种压缩格式;坏处是与简单的buffer解析相比, 大大增加了复杂度:
从stream读取时, 字节数组总是输入数据的一小部分
到达stream的末尾时, buffer数据会被刷新
为了解决上述问题, Automa
打破了TranscodingStreams
的一些抽象, 提供了一个generate_reader
函数, 该函数是一个代码生成器, 生成的Julia代码是一个可以把IO
当作输入的解析器函数:
# generate_reader生成的代码示例:
function { function name }(stream::TranscodingStream, { args... })
{ init code }
@label __exec__
p = current buffer position
p_end = final buffer position
# the eof call below will first flush any used data from buffer,
# then load in new data, before checking if it's really eof.
is_eof = eof(stream)
execute normal automa parsing of the buffer
update buffer position to match p
{ loop code }
if cs < 0 # meaning: erroneous input or erroneous EOF
{ error code }
end
if machine errored or reached EOF
@label __return__
{ return code }
end
@goto __exec__
end
与之前从buffer解析生成的代码相比, 主要区别是@goto __exec__
, 这个代码让Automa重复将数据加载到buffer, 执行解析, 然后刷新buffer, 再执行解析...直到中断/结束。
p
和p_end
是当前buffer的位置, 而不是stream中的位置, 所以不能简单地存储指向当前p
的变量marked_pos
;
IO解析器的构建
使用generate_reader
构建IO解析器:
需要设置
returncode
和errorcode
errorcode
可以设置成立即返回:@goto __return__
return_code = :(iszero(cs))
error_code = :(@goto __return__)
eval(generate_reader(:validate_fasta, machine; returncode=return_code, errorcode=error_code))
上述代码生成一个validate_fasta(stream::TranscodingStream)
函数, 如果IO不是一个TranscodingStream
类型, 可以用NoopStream
函数将其包装成TranscodingStream
类型:
io = NoopStream(IOBuffer(">a\nTAG\nTA\n>bac\nG\n"))
validate_fasta(io) # true
validate_fasta(NoopStream(IOBuffer("random data"))) # false
读取单条记录
上述代码对IO进行全局匹配检查, 但是如果我们想在读取到第一条fasta记录后就停掉该如何做呢, 可以用@escape
伪宏:
struct Seq
name::String
seq::String
end
machine = let
header = onexit!(onenter!(re"[a-z]+", :mark_pos), :header)
seqline = onexit!(onenter!(re"[ACGT]+", :mark_pos), :seqline)
record = onexit!(re">" * header * '\n' * rep1(seqline * '\n'), :record)
compile(rep(record))
end
@assert machine isa Automa.Machine
# add @escape
actions = Dict{Symbol, Expr}(
:mark_pos => :(pos = p),
:header => :(header = String(data[pos:p-1])),
:seqline => :(append!(seqbuffer, data[pos:p-1])),
:record => quote
seq = Seq(header, String(seqbuffer))
found_sequence = true
# 如果不是文件末尾, 就重新设置p = p-1
p -= !(is_eof && p > p_end)
@escape
end
)
@assert actions isa Dict
@escape
是伪宏
, 是因为它在宏展开以后但在成Julia代码之前工作, 将@escape
所在行替换成跳出machine的代码。
使用generate_reader
封装以上actions
:
generate_reader(
:read_record,
machine;
actions=actions,
initcode=quote
seqbuffer = UInt8[]
pos = 0
found_sequence = false
header = ""
end,
loopcode=quote
if (is_eof && p > p_end) || found_sequence
@goto __return__
end
end,
returncode=:(found_sequence ? seq : nothing)
) |> eval
在上述代码中, 设置了一个flag来判断是否应该返回(
found_sequence
), 这是因为loopcode
发生在机器执行之后, 所以循环可能因为被@escape
中断, 也可能因为buffer用完了需要重新加载; 如果直接返回, 就会跳过generate_reader
中设置buffer状态的相关代码; 而在loopcode中, 在buffer刷新之后, 判断found_sequence
标签, 则可以避免这个问题。使用
@escape
来停止machine的执行为什么要递减p?
当IO读到下一个
>
时, 意味着一条fasta记录的读取结束, 如果此时从同一IO读取另一条记录, 开头的>
是已经被读完了的, 所以需要重设p=1
以读取>
上述代码生成的read_record
函数可以这样调用:
io = NoopStream(IOBuffer(">\nT\n>tag\nGAGA\nTATA\n"));
read_record(io) # return the 1st seq record
read_record(io) # return the 2nd seq record
read_record(io) # return nothing
通过标记buffer来保留数据
上述代码有个问题, 如actions中的header = String(data[pos:p-1])
这句代码中, 通过访问data buffer的下标来提取header信息, 但是在读取IO时, 我怎么知道在我定义pos
之前, data没有改变呢?
>abcdefghijkl\nA
时, buffer首先读>abcdefg
, 然后就满了, 然后开始解析header, 此时正常解析p = 2
, 所以pos = 2
; 但之后读取header的后半段时, buffer已经更新了, 此时数据是hijkl\nA
, 但p=14
, 如果访问data[2:13]
, 就会越界。
TranscodingStreams
的buffer允许对一个位置进行标记(mark
), 此时buffer不会刷新标记的位置或标记位置之后的任何位置
使用
@mark()
宏标记p
;使用
@markpos()
宏获取标记的位置;@markpos()
获取的位置指向buffer中的相同位置, 即便buffer刷新也不变;为啥这样可用?
mark
存储在TranscodingStream
的buffer中, 随着内容更新, 会自动更新mark;
有了标记功能, 就可以重写之前的不合理代码了:
actions = Dict{Symbol, Expr}(
:mark_position => :(@mark),
:header => :(header = String(data[@markpos():p-1])),
:seqline => :(append!(buffer, data[@markpos():p-1])),
# 其他代码略
)
2
的位置: content: >abcdefg
mark:.....^
p = 2.....^
当p = 9
时, buffer耗尽, 更新buffer, 保留byte2
, 删除开始位置到标记位置之前的byte(在这里是byte 1), generate_reader
生成的代码跳转到@label __exec__
位置, 并将p
设置为当前buffer的位位置:
content: abcdefgh
mark:....^
p = 8...........^
之后buffer再次耗尽, 此时标记位置在起始位置, 所以没有数据被删除, 此时将调整buffer以容纳更多数据:
content: abcdefghijkl\nA
mark:....^
p = 9............^
最后, 读取到p = 13
时, 读取完整个表头, data[@markpos():p-1]
此时可以正确匹配header信息(这里是1:12
)
content: abcdefghijkl\nA
mark:....^
p = 13...............^
最最后, 记得更新mark, 或者用@unmark()
清除mark, 从而允许刷新buffer。
相关API
generate_reader(f::Symbol, m::Automa.Machine; kwargs...)
kwargs
可选项:
arguments
: 传递给f
的其他参数信息, 默认是()
;context
: 指定代码生成器, 默认是Automa.CodeGenCOntext()
;actions
: Action字典, 默认Dict{Symbol, Expr()}
initcode
: 初始化代码, 默认是()
;loopcode
: 循环代码, 默认是()
;returncode
: 返回代码, 默认是()
;errorcode
: 在loopcode中cs<0
后执行的代码;
@escape
伪宏(Pseudomacro), 用于中断Machine的执行, p
会正常前进一格: 如解析ABC
的B
时执行了@escape
, 下一个byte是C
。
@mark
伪宏, 标记当前buffer中的p位置, 被标记的位置不会从buffer中被清洗掉;
@unmark
伪宏, 从buffer中删除标记, 从而允许清除buffer中之前的数据;
@markpos
伪宏, 获取标记在buffer中的位置;
@bufferpos
伪宏, 返回当前TranscodingStreams buffer
的整数位置(只能在generate_reader
中使用);
例子:
# ...
# Inside some Automa action code
@setbuffer()
description = sub_parser(stream)
p = @bufferpos()
@relpos
伪宏, 返回相对于@markpos()
的p
, 等效于p - @markpos() + 1
, 用于在已经标记的情况下标记数据流中的其他点, 可以用@abspos
获取相应的action position。
例子:
# In one action
identifier_pos = @relpos(p)
# Later, in a different action
identifier = data[@abspos(identifier_pos): p]
@abspos
伪宏, 获取@relpos
标记的相对位置。
@setbuffer
伪宏, 更新buffer的位置以匹配p
, 用于在调用新的解析器前更新buffer。
例子:
# Inside some Automa action code
@setbuffer()
description = sub_parser(stream)
p = @bufferpos()
创建Reader
类型
在上一节用generate_reader
构建的IO解析器中存在一个问题: 虽然我们可以通过多次调用read_record
来读取多条记录, 但是多次调用之间没有保存任何状态信息, 这也是为什么每次都要重新设置p
的原因。
想象你有一种数据格式, 包含A和B两种类型的条目, A条目必须位于B条目之前。因此, 虽然B条目可以在文件中随时出现, 但一旦看到B, 就不能再有A了, 此时需要在读取文件时记录是否已经看到过B条目。
在Automa
中, 通过创建一个Reader
类型来解决这个问题。用Reader
封装正在解析的IO, 并储存相关的状态信息, 还是以FASTA格式为例来看:
struct Seq
name::String
seq::String
end
machine = let
header = onexit!(onenter!(re"[a-z]+", :mark_pos), :header)
seqline = onexit!(onenter!(re"[ACGT]+", :mark_pos), :seqline)
record = onexit!(re">" * header * '\n' * rep1(seqline * '\n'), :record)
compile(rep(record))
end
@assert machine isa Automa.Machine
# 配置Reader类型
mutable struct Reader{S <: TranscodingStream}
io::S
automa_stat::Int
end
Reader(io::TranscodingStream) = Reader{typeof(io)}(io, 1)
Reader(io::IO) = Reader(NoopStream(io))
Reader
包含一个状态信息属性, Automa的起始状态始终为1。现在, 我们可以创建一个新的Reader函数, 与之前相比有三点不同:
不再需要递减
p
: 现在我们可以在Records之间记录Automa的状态, 无需重置p
来配合初始化IO了;返回值是
(cs, state)
而不仅仅是state
: 因为我们想要更新Automa的状态, 所以读取下一条记录时, 从上一个条目停止时的相同状态开始;添加了
start_state
参数, 并在initcode
中将cs
设置为start_state
, 从而保证machine有正确的起始状态;
actions = Dict{Symbol, Expr}(
:mark_pos => :(@mark),
:header => :(header = String(data[@markpos():p-1])),
:seqline => :(append!(seqbuffer, data[@markpos():p-1])),
:record => quote
seq = Seq(header, String(seqbuffer))
found_sequence = true
@escape
end
)
generate_reader(
:read_record,
machine;
actions=actions,
arguments=(:(start_state::Int),),
initcode=quote
seqbuffer = UInt8[]
found_sequence = false
header = ""
cs = start_state
end,
loopcode=quote
if (is_eof && p > p_end) || found_sequence
@goto __return__
end
end,
returncode=:(found_sequence ? (cs, seq) : throw(EOFError()))
) |> eval
最后, 我们创建一个函数, 从Reader
中读取数据, 并确保更新automa_state
:
function read_record(reader::Reader)
(cs, seq) = read_record(reader.io, reader.automa_state)
reader.automa_state = cs
return seq
end
测试一下吧:
julia> reader = Reader(IOBuffer(">a\nT\n>tag\nGAG\nATATA\n"));
julia> read_record(reader)
Seq("a", "T")
julia> read_record(reader)
Seq("tag", "GAGATATA")
julia> read_record(reader)
ERROR: EOFError: read end of file
Automa的调试
从上述教程可以看到, 利用Automa构建一个解析器是相对复杂的过程, 所以构建的时候容易出错, Automa提供了一些自带/第三方工具用于调试。
Revise.jl
无法自动更新Automa生成的函数。所以修改后手动重新运行定义相关函数的代码通常是更快的方案。
machine = let
alphabet = re"BC"
band = onenter!(re"BBA", :cool_band)
compile(re"XYZ A" * (alphabet | band))
end
Automa的报错会提供一个引发错误的示例输入, 可以根据该示例回溯哪里引发了歧义。上边的例子比较简单, 下边还是以FASTA为例提供一个稍复杂的例子:
fasta_machine = let
header = re"[a-z]+"
seq_line = re"[ACGT]+"
sequence = seq_line * rep('\n' * seq_line)
record = onexit!('>' * header * '\n' * sequence, :emit_record)
compile(rep(record * '\n') * opt(record))
end
# ERROR
# Ambiguous NFA. After inputs ">a\nA", observing '\n' lead to conflicting action sets nothing and [:emit_record]
问题出现在序列行尾的\n
, 上述写法导致machine无法判断\n
是前一条序列的结尾, 还是后一条序列的起始, 所以应该把末位的\n
当作前一条序列的一部分:
fasta_machine = let
header = re"[a-z]+"
seq_line = re"[ACGT]+"
sequence = rep1(seq_line * '\n')
record = onexit!('>' * header * '\n' * sequence, :emit_record)
# A special record that can avoid a trailing newline, but ONLY if it's the last record
record_eof = '>' * header * '\n' * seq_line * rep('\n' * seq_line) * opt('\n')
compile(rep(record * '\n') * opt(record_eof))
end
@assert fasta_machine isa Automa.Machine
当实在无法编译通过时, 可以设置unambiguous=false
, 但是自己要确保程序的歧义行为不会引发下游的问题。
machine2dot(m::Machine)
函数返回Graphviz.dot
格式的字符串, 可以用Graphviz生成svg格式的图片;可以用以下自定义函数自动执行
machine->dot->svg->view
:
function display_machine(m::Machine)
open("/tmp/machine.dot", "w") do io
println(io, Automa.machine2dot(m))
end
run(pipeline(`dot -Tsvg /tmp/machine.dot`, stdout="/tmp/machine.svg"))
run(`firefox /tmp/machine.svg`)
end
另外有一些Automa的内部函数用于调试DFA/NFA:
re2nfa
: 从RE构建NFA;nfa2dot
: 从NFA构建.dot
格式的图;nfa2dfa
: 略dfa2dot
: 略
generate_code
函数中的actions
参数, 如果设置为:debug
, 则会将给定Machine
的所有操作都替换成:(push!(logger, action_name))
, 此时运行该machine会实时输出匹配的action_name: @eval function debug(data)
logger = []
$(generate_code(fasta_machine, :debug))
logger
end
debug(">abc\nTAG")
4-element Vector{Any}:
:mark
:header
:mark
:seqline
:record
:debug
会替换所有的action
, 所以如果Machine是需要通过修改action
的操作(如修改p
)才能起作用, 就不能使用:debug
。
test/debug.jl
中包含更多的调试函数, 可以被include
。 主要的调试函数是debug_execute
和create_debug_function
: machine = let
letters = onenter!(re"[a-z]+", :enter_letters)
compile(onexit!(letters * re",[0-9]," * letters, :exiting_regex))
end
eval(create_debug_function(machine; ascii=true))
(end_state, transitions) = debug_compile("abc,5,d!")
@show end_state
transitions
# 将输出:
end state = -6
7-element Vector{Tuple{Char, Int64, Vector{Symbol}}}:
('a', 2, [:enter_letters])
('b', 2, [])
('c', 2, [])
(',', 3, [])
('5', 4, [])
(',', 5, [])
('d', 6, [:enter_letters])
# 输出tuple的元素分别为(输入字节, 读取该字节时的Automa状态, 执行的action)
debug_execute
与debugcompile
工作方式类似, 区别是不需要预先compile, 可以直接输入RE: debug_execute(re"[A-z]+", "abc1def"; ascii=true)
延伸
应用Automa的案例
举一个具体的案例分析。
案例 BioStockholm.jl
Stockholm格式(下称
sto
)是一个多序列比对格式, 用于Pfam, Rfam, Dfam等数据库;后缀常用
sto
或stk
;
语法:
需要
#
开始的表头, 第一行为格式版本,# STOCKHOLM 1.0
;表头后跟多行记录, 每条记录都包含自己的
#
开头的标记行以及序列行, 然后以//
结束:
# STOCKHOLM 1.0
#=GF ID EXAMPLE
<seqname> <aligned sequence>
<seqname> <aligned sequence>
<seqname> <aligned sequence>
//
示例:
# STOCKHOLM 1.0
#=GF ID UPSK
#=GF SE Predicted; Infernal
#=GF SS Published; PMID 9223489
#=GF RN [1]
#=GF RM 9223489
#=GF RT The role of the pseudoknot at the 3' end of turnip yellow mosaic
#=GF RT virus RNA in minus-strand synthesis by the viral RNA-dependent RNA
#=GF RT polymerase.
#=GF RA Deiman BA, Kortlever RM, Pleij CW;
#=GF RL J Virol 1997;71:5990-5996.
AF035635.1/619-641 UGAGUUCUCGAUCUCUAAAAUCG
M24804.1/82-104 UGAGUUCUCUAUCUCUAAAAUCG
J04373.1/6212-6234 UAAGUUCUCGAUCUUUAAAAUCG
M24803.1/1-23 UAAGUUCUCGAUCUCUAAAAUCG
#=GC SS_cons .AAA....<<<<aaa....>>>>
//
说明:
序列行:
序列每行一条, 序列名称顶格, 名称格式:
name/start-end
或name
, 名称后的任意数量空格后跟着序列;序列字母支持除空格外的其他任意字符, Gap用
.
或-
表示;
标记行:
以
#
开头, 空格分隔参数;定义的标记类型:
#=GF <feature> <Generic per-File annotation, free text>
#=GC <feature> <Generic per-Column annotation, exactly 1 char per column>
#=GS <seqname> <feature> <Generic per-Sequence annotation, free text>
#=GR <seqname> <feature> <Generic per-Residue annotation, exactly 1 char per residue>
记忆思路: F=per-File; C=per-Column; S=per-Sequence; R=per-Residue
这里贴上我个人注解的作者源码实现, 想看作者的源码在这里:
module BioStockholm
# 载入依赖包和依赖函数
import Automa
using Automa: @re_str, onenter!, onexit!
const re = Automa.RegExp
using OrderedCollections: OrderedDict
# 设置输出的数据结构/函数
export MSA
"""
MSA{T}
Stockholm format for multiple sequence alignment with
annotations. Sequence data is of type `Vector{T}`.
## Fields
seq: seqname => seqdata
GF : per_file_feature => text
GS : seqname => per_seq_feature => text
GC : per_file_feature => seqdata
GR : seqname => per_seq_feature => seqdata
"""
Base.@kwdef struct MSA{T}
# seqname => seqdata
seq :: OrderedDict{String, Vector{T}} =
OrderedDict{String, Vector{T}}()
# per_file_feature => text
GF :: OrderedDict{String, String} =
OrderedDict{String, String}()
# seqname => per_seq_feature => text
GS :: OrderedDict{String, OrderedDict{String, String}} =
OrderedDict{String, OrderedDict{String, String}}()
# per_file_feature => seqdata
GC :: OrderedDict{String, Vector{T}} =
OrderedDict{String, Vector{T}}()
# seqname => per_seq_feature => seqdata
GR :: OrderedDict{String, OrderedDict{String, Vector{T}}} =
OrderedDict{String, OrderedDict{String, Vector{T}}}()
function MSA{T}(seq, GF, GS, GC, GR) where {T}
if valtype(seq) === String
seq = OrderedDict(name => T.(collect(s)) for (name,s) in seq)
end
if valtype(GC) === String
GC = OrderedDict(feat => T.(collect(s)) for (feat,s) in GC)
end
if valtype(valtype(GR)) === String
GR = OrderedDict(name => OrderedDict(feat => T.(collect(s)) for (feat,s) in f2s) for (name,f2s) in GR)
end
return new{T}(seq, GF, GS, GC, GR)
end
end
# 下面是定义base.==和base.show, 与解析器无关, 这里就不粘贴了
function Base.:(==)(s1::MSA, s2::MSA)
# pass
end
function Base.show(io::IO, mime::MIME"text/plain", msa::MSA)
# pass
end
# 定义machine
const stockholm_machine = let
nl = re"\r?\n" # 支持回车符的新行RE
ws = re"[ \t]+" # 分隔符: 空格/制表符
feature = re"[^ \t\r\n]+" # 分隔符开头的行
# TODO: slash '/' prohibited at beginning of seqname
seqname = re"[^#/ \t\r\n][^ \t\r\n]*" # 匹配序列名, 开头不能是#,/和分隔符, 后边允许是/
text = re"[^ \t\r\n][^\r\n]*"
seqdata = re"[^ \t\r\n]+"
line_header = re"# STOCKHOLM 1\.0" * nl # 头文件
line_end = re"//" * nl # 记录结束
line_GF = re"#=GF" * ws * feature * ws * text * nl
line_GC = re"#=GC" * ws * feature * ws * seqdata * nl
line_GS = re"#=GS" * ws * seqname * ws * feature * ws * text * nl
line_GR = re"#=GR" * ws * seqname * ws * feature * ws * seqdata * nl
line_seq = seqname * ws * seqdata * nl
line_empty = re"[ \t]*" * nl
# 封装成整个条目的RE
stockholm = (
re.rep(line_empty) # 空行分配给后一个条目, 而不是前一个条目
* line_header
* re.rep(line_GF | line_GC | line_GS | line_GR | line_seq | line_empty)
* line_end
)
onenter!(nl, :countline)
onenter!(feature, :enter_feature)
onexit!(feature, :feature)
onenter!(seqname, :enter_seqname)
onexit!(seqname, :seqname)
onenter!(text, :enter_text)
onexit!(text, :text)
onenter!(seqdata, :enter_seqdata)
onexit!(seqdata, :seqdata)
onexit!(line_GF, :line_GF)
onexit!(line_GC, :line_GC)
onexit!(line_GS, :line_GS)
onexit!(line_GR, :line_GR)
onexit!(line_seq, :line_seq)
Automa.compile(stockholm)
end
# 定义Actions
const stockholm_actions = Dict(
:countline => :(linenum += 1), # 记录行号
:enter_feature => :(mark = p),
:enter_seqname => :(mark = p),
:enter_text => :(mark = p),
:enter_seqdata => :(mark = p),
:feature => :(feature = mark == 0 ? "" : String(data[mark:p-1]); mark = 0),
:seqname => :(seqname = mark == 0 ? "" : String(data[mark:p-1]); mark = 0),
:text => :(text = mark == 0 ? "" : String(data[mark:p-1]); mark = 0),
:seqdata => :(seqdata = mark == 0 ? T[] : T.(collect(data[mark:p-1])); mark = 0),
:line_GF => quote
if haskey(gf_records, feature)
gf_records[feature] *= " " * text
else
gf_records[feature] = text
end
end,
# 读取记录存dict
:line_GC => :(
# gc_records[feature] = get(gc_records, feature, "") * seqdata
gc_records[feature] = append!(get(gc_records, feature, T[]), seqdata)
),
:line_GS => quote
if haskey(gs_records, seqname)
if haskey(gs_records[seqname], feature)
gs_records[seqname][feature] *= " " * text
else
gs_records[seqname][feature] = text
end
else
gs_records[seqname] = OrderedDict(feature => text)
end
end,
:line_GR => quote
if haskey(gr_records, seqname)
# gr_records[seqname][feature] = get(gr_records[seqname], feature, "") * seqdata
gr_records[seqname][feature] = append!(get(gr_records[seqname], feature, T[]), seqdata)
else
gr_records[seqname] = OrderedDict(feature => seqdata)
end
end,
:line_seq => :(
sequences[seqname] = append!(get(sequences, seqname, T[]), seqdata)
),
)
# 派发Base.read和Base.write支持MSA的读写
Base.read(io::IO, ::Type{MSA{T}}) where {T} = parse(MSA, read(io, String))
Base.read(io::IO, ::Type{MSA}) = read(io, MSA{Char})
Base.read(filepath::AbstractString, ::Type{MSA{T}}) where {T} = open(filepath) do io
read(io, MSA)
end
Base.read(filepath::AbstractString, ::Type{MSA}) = read(filepath, MSA{String})
Base.write(io::IO, msa::MSA) = print(io, msa)
Base.write(filepath::AbstractString, msa::MSA) =
open(filepath, "w") do io
print(io, msa)
end
# 派发Base.parse 支持 MSA
Base.parse(::Type{MSA}, data::Union{String, Vector{UInt8}}) =
parse_stockholm(Char, data)
Base.parse(::Type{MSA{T}}, data::Union{String, Vector{UInt8}}) where {T} =
parse_stockholm(T, data)
const context = Automa.CodeGenContext(generator=:goto)
@eval function parse_stockholm(::Type{T}, data::Union{String, Vector{UInt8}}) where {T}
# variables for the action code
sequences = OrderedDict{String,Vector{T}}() # seqname => seqdata
gf_records = OrderedDict{String,String}() # feature => text
gc_records = OrderedDict{String,Vector{T}}() # feature => seqdata
gs_records = OrderedDict{String,OrderedDict{String,String}}() # seqname => feature => text
gr_records = OrderedDict{String,OrderedDict{String,Vector{T}}}() # seqname => feature => seqdata
linenum = 1
mark = 0
seqname = ""
feature = ""
text = ""
seqdata = ""
# init vars for state machine
$(Automa.generate_init_code(context, stockholm_machine))
p_end = p_eof = lastindex(data)
# main loop over input data
$(Automa.generate_exec_code(context, stockholm_machine, stockholm_actions))
if cs != 0
error("failed to parse on line ", linenum)
end
return MSA{T}(; seq=sequences, GF=gf_records, GC=gc_records,
GS=gs_records, GR=gr_records)
end
# 后边还有派发Base.print(), 与解析器无关, 就不粘贴了
end # module BioStockholm