文本diff算法
➜ algorithm git:(main) ✗ cat a.txt
A
B
C
A
B
B
A
C
➜ algorithm git:(main) ✗ cat b.txt
D
C
B
A
E
B
A
E
➜ algorithm git:(main) ✗ diff a.txt b.txt
1,2c1
< A
< B
---
> D
4d2
< A
5a4,5
> A
> E
8c8
< C
---
> E
上面是mac os下diff指令的输出,假设让你自己来实现,你会怎么写呢?
问题定义
首先,我们要先把问题简化并定义好:给定两个数组A和B,求把A转化成B的最小步骤。
错误想法
简单,遍历A的每个元素,如果不在B中,则标记为删除,然后遍历B中的每个元素,如果不在A中,则标记为添加。
to_del = []
for i, x in enumerate(src):
if x not in dest:
to_del.append(i)
to_add = []
for i, x in enumerate(dest):
if x not in src:
to_add.append(i)
但是这种输出会变成在A中删除某些元素,在B中添加某些元素,这并不正确,因为所有操作都应该是在A上的。
好,再来。这次同时遍历A和B,如果A[i]等于B[i],则跳过,否则先删除A[i],然后添加B[i]。
to_add = []
to_del = []
i = 0
while i < len(src):
x = src[i]
y = dest[i]
if x == y:
i += 1
else:
to_del.append(i)
to_add.append((i, y))
但是这种写法,实际上只能应付修改,一旦遇到添加和删除,后续就算有相同的元素也不会被识别出来。
好,我想不到办法了,还是来看看别人家的孩子吧。
最长公共子序列
上面错误想法的产生很大程度上是因为没有立足于"最小步骤",或者说在解题前根本就忽略了要先把问题定义好。
要求步骤最小,其实答案已经比较明显了,尽可能地保留A与B中相同的内容,也就是先求出A与B的最长公共子序列C,然后删除A与C之间的差异,添加C与B之间的差异。细节在于,要保留C的路径用于计算差异,并且是针对A和B各一个。
#coding=utf-8
UP_LEFT = 0
UP = 1
LEFT = 2
def _text(i, j):
if i == j:
return str(i)
else:
return '%s,%s' % (i, j)
def print_operations(src, dest, lcs_src, lcs_dest):
'''
@lcs_x: lcs在x中的下标列表(0-index base)
'''
i = 0
j = 0
k = 0
# lcs以外的末端也要处理
lcs_a = list(lcs_src)
lcs_b = list(lcs_dest)
lcs_a.append(len(src))
lcs_b.append(len(dest))
while k < len(lcs_a):
# 删除src[i, next_a),添加dest[j, next_b),如果同时存在删除和添加,则转为修改
next_a = lcs_a[k]
next_b = lcs_b[k]
if i < next_a and j < next_b:
text_a = _text(i+1, next_a)
text_b = _text(j+1, next_b)
print '%sc%s' % (text_a, text_b)
gap = True
elif i < next_a:
text_a = _text(i+1, next_a)
print '%sd%d' % (text_a, j)
gap = False
elif j < next_b:
text_b = _text(j+1, next_b)
print '%da%s' % (i, text_b)
gap = False
else:
gap = False
while i < next_a:
print "< %s" % src[i]
i += 1
if gap:
print "---"
while j < next_b:
print "> %s" % dest[j]
j += 1
# 跳过lcs中的元素
i += 1
j += 1
k += 1
def diff(src, dest):
m = len(src)
n = len(dest)
# dynamic partion table,行为src,列为dest,所以len(dp) == len(src),len(dp[0]) == len(dest)
# dp[i][j]为src_i与dest_j的最长公共子序列的长度
dp = [[0] * (n + 1) for _ in range(m + 1)]
# trace记录了dp的转移关系,也就是lcs的扩展规则,转移关系
trace = [[0] * (n + 1) for _ in range(m + 1)]
# 进行状态转移
for i in range(1, m + 1):
for j in range(1, n + 1):
if src[i - 1] == dest[j - 1]:
# src_i与dest_j的lcs为src_i-1与dest_j-1的lcs加上src[i]
dp[i][j] = 1 + dp[i-1][j-1]
trace[i][j] = UP_LEFT
else:
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
if dp[i-1][j] > dp[i][j-1]:
# src_i与dest_j的lcs等于src_i-1与dest_j的lcs
trace[i][j] = UP
else:
# src_i与dest_j的lcs等于src_i与dest_j-1的lcs
trace[i][j] = LEFT
lcs_a = []
lcs_b = []
lcs = []
while i > 0 and j > 0:
from_pos = trace[i][j]
if from_pos == UP_LEFT:
# src[i]为当前lcs的末尾字符
lcs.append(src[i-1])
lcs_a.append(i-1)
lcs_b.append(j-1)
# lcs回溯
i -= 1
j -= 1
elif from_pos == LEFT:
# lcs回溯
j -= 1
else:
# lcs回溯
i -= 1
lcs_a.reverse()
lcs_b.reverse()
print_operations(src, dest, lcs_a, lcs_b)
if __name__ == '__main__':
import sys
src = sys.argv[1]
dest = sys.argv[2]
diff(src, dest)
结果输出:
➜ code git:(master) ✗ python lcs.py ABCABBAC DCBAEBAE
1,2c1
< A
< B
---
> D
4d2
< A
5a4,5
> A
> E
8c8
< C
---
> E
Well done!
然而还有更天才的做法。
Myers算法
Myers针对两个文本,建立了一张类似lcs算法的dp表,但实际上是一个图的概念。
定义从左往右移动代价为1,代表删除;从上往下移动代价为1,代表添加;沿对角线移动代价为0,代表保留。对角线只有在src[x]等于dest[y]时才能走,而另外两种移动方式无条件能用。
于是就把寻找"最小步骤"转化为了从(0, 0)到(n, m)的最短路径寻路问题。
那怎么走才是最短的呢?比较直观的走法就是尽量走对角线,如果从起点往右走一步就有一条直达终点的对角线,那自然不会在起点时选择往下的方向走了。(我隐约觉得Myers比lcs快,就在于走对角线减少了一大堆计算)。
但是,问题来了,优先走对角线一定是最快的吗?不一定,可能当前这条对角线很短,而先往右或先往下走一步会有一条更长的对角线。所以Myers是一个贪心算法,并不保证得出的是最短路径。
算法设置了两层循环,外层循环增加移动的步数,内层循环更新每种选择的最远到达目标,当其中一条路径到达目标点时,则循环结束。
当然这只是我这种俗人的表述,Myers增加了k线的概念,每个点(x, y)所在的k线为x-y。移动就相当于在k线间切换,例如从(x, y)往右移动一步至(x+1, y),设k=x-y,则相当于是从k移动到了k+1。同理,往下走一步,即从k移动到k-1,而沿对角线移动则是更新k线的最远到达坐标(规定一定要沿对角线走到头)。
定义好k线后,内循环就是更新图中k线的最远到达目标,直至k线n-m到达终点。在移动时记录移动信息,在到达终点时即可开始回溯输出路径,并转译为对应的文本操作。
#coding=utf-8
ADD = 1
DEL = 2
CHANGE = 3
def _text(i, j):
if i == j:
return str(i)
else:
return '%s,%s' % (i, j)
class Move(object):
def __init__(self, p1, p2, op):
self.p1 = p1
self.p2 = p2
self.op = op
class Route(object):
def __init__(self):
self.moves = []
def add(self, p1, p2, op):
if not self.moves:
self.moves.append(Move(p1, p2, op))
else:
last_move = self.moves[0]
if last_move.p1 == p2:
last_move.p1 = p1
if last_move.op != op: last_move.op = CHANGE
else:
self.moves.insert(0, Move(p1, p2, op))
def dump(self, src, dest):
for move in self.moves:
gap = False
if move.op == DEL:
x1 = move.p1[0]
x2 = move.p2[0]
y = move.p1[1]
y1 = 0
y2 = 0
print '%sd%d' % (_text(x1+1, x2), y)
elif move.op == ADD:
x1 = 0
x2 = 0
y1 = move.p1[1]
y2 = move.p2[1]
x = move.p1[0]
print '%da%s' % (x, _text(y1+1, y2))
else:
x1 = move.p1[0]
x2 = move.p2[0]
y1 = move.p1[1]
y2 = move.p2[1]
gap = True
print '%sc%s' % (_text(x1+1, x2), _text(y1+1, y2))
while x1 < x2:
print "< %s" % src[x1]
x1 += 1
if gap:
print "---"
while y1 < y2:
print "> %s" % dest[y1]
y1 += 1
def diff(src, dest):
n = len(src)
m = len(dest)
# k = x - y, v[k] = x
# v记录在指定步数d下k线能到达的最远目标点
v = {}
# d为0时需要预设last_v[1]的值
v[1] = 0
# trace保存每一步的v记录
trace = []
# 有可能一步都不用走,即src与dest完全相等,所以d从0开始
d = 0
found = False
while d <= n + m:
# last_v为上一轮的v
last_v, v = v, {}
# 所以trace[d]对应的是上一轮d-1的v
trace.append(last_v)
k = -d
while k <= d:
if k == -d:
down = True
elif k == d:
down = False
elif last_v[k-1] < last_v[k+1]:
down = True
else:
down = False
# down表示是否是从上一个点向下走过来的
if down:
x = last_v[k+1]
else:
x = last_v[k-1] + 1
y = x - k
while x < n and y < m and src[x] == dest[y]:
x += 1
y += 1
v[k] = x
# 注意trace不需要保存到达终点时步数对应的这一次v
if x == n and y == m:
found = True
break
k += 2
if found:
break
d += 1
operations = []
route = Route()
while d >= 0:
# 注意trace[d]对应的是d-1的v
last_v = trace[d]
# 当前的k线
k = x - y
# 只可能从k-1或k+1来,根据规则可以判断出到底是从哪来
if k == -d:
down = True
elif k == d:
down = False
elif last_v[k-1] < last_v[k+1]:
down = True
else:
down = False
if down:
last_k = k + 1
else:
last_k = k - 1
# 再将k线移动转化为具体的文本操作
# print "d: %s, k: %s, x: %s, y: %s, last_k: %s" % (d, k, x, y, last_k)
last_x = last_v[last_k]
last_y = last_x - last_k
while x > last_x and y > last_y:
x -= 1
y -= 1
op = 'move'
c = src[x]
operations.append((op, c))
# d等于0时其实已经回溯完了
if d > 0:
if down:
route.add((x, y-1), (x, y), ADD)
y = y - 1
else:
route.add((x-1, y), (x, y), DEL)
x = x - 1
d -= 1
route.dump(src, dest)
if __name__ == '__main__':
import sys
src = sys.argv[1]
dest = sys.argv[2]
diff(src, dest)
输出:
➜ code git:(master) ✗ python myers.py ABCABBAC DCBAEBAE
1,2c1
< A
< B
---
> D
3a3
> B
4a5
> E
6d6
< B
8c8
< C
---
> E
可以看到跟lcs得出的结果并不一致,步骤比lcs要多,但是已经足够优秀。
算法复杂度对比一下,lcs是O(n * m),而Myers是O(d * d),其中d是最终的操作步数,明显,文本差异越少,d就越小。而相比之下,lcs就算两个文本完全一样,依然要把整个dp表填满才肯罢休。