学生の頃、表題のアルゴリズムを研究に応用しようとしていましたが、すっかり忘れているので、思い出すために改めてPythonで実装してみました。当時は、matlabでfor文を回して実現していましたが、今回は、勉強がてらラムダ式やら再帰呼び出しやらを駆使して書いてみようと思います。
ここから、書いたソースコードを紹介していきます。なお、いつものように
import numpy as np
としています。では、まずセグメンテーションの起点となる関数です。
def SequenceSegmentation(SequenceData , k):
f_ini = lambda t : (SequenceData.h(0 , t) , 0)
ResultList = []
DPSegmentation(SequenceData , k , f_ini , 1 , ResultList)
ResultList.reverse()
T = np.zeros(k + 1)
T[1:k] = np.array(ResultList)[: , 1]
T[k] = SequenceData.GetLength()
return T
シーケンシャルデータのオブジェクト(詳しくは後述)とセグメント数を与え、最適なセグメント境界位置を返します。実際に動的計画法でセグメンテーションを行う部分は、この関数で呼びだしているDPSegmentationという再帰関数です。f_iniは上述のにあたる関数オブジェクトです。ただし、上記の数式上ではその値を返す関数ですが、プログラムでは値とその時に選ばれたのタプル(つまり、とのタプル)を返す関数としています。f_iniも同様で、なのでタプルの第2要素を全て0とします。これを初期値としてDPSegmentationに与えると、ResultListに各セグメントの境界位置(実際には、それとの値のタプル)がリストで得られるという寸法です。
ということで、その動的計画法の再帰関数です。
def DPSegmentation(SequenceData , k , f_old , i , ResultList):
if i == k - 1:
N = SequenceData.GetLength()
Result = GetMax((lambda t_old: f_old(t_old)[0] +
SequenceData.h(t_old , N )) , range(i , N ) )
ResultList.append(Result)
return Result
g = lambda t: lambda t_old: f_old(t_old)[0] + SequenceData.h(t_old , t)
f = lambda t: GetMax(g(t) , range(i , t))
f = MemoizeFunc(f)
Result = DPSegmentation(SequenceData , k , f , i + 1 , ResultList)
ResultList.append(f(Result[1]))
return f(Result[1])
第1引数はシーケンシャルデータクラスから生成したオブジェクトです。見て分かる通りシーケンスの長さ()を返すメソッドGetLength()と2つの引数(セグメントの境界位置)を受け取りセグメントのスコアを返すh(t1 , t2)が定義されている必要があります。第2引数はセグメント数、第3引数のf_oldは前ステップで作成した上述の関数にあたる関数オブジェクトを入れます。第4引数iはそのf_oldの添え字番号を表し、第5引数のリストに計算結果を詰めていきます。
ソースコードにコメントで番号をふったので、以下その番号に沿って説明していきます。
(2)関数のメモ化
この実装では関数のメモ化というテクニックを利用して、動的計画法の肝となる途中までの計算結果の保持を各にやらせています。メモ化とは1度引数として与えて計算した値は保持しておいて、再度同様の引数が与えられたときに再計算する必要をなくすような関数にすることをいいます。メモ化に関してはM.Hiroiさんのホームページを参考にしました。
本実装では引数で与えた関数をメモ化してくれる関数をMemoizeFuncと定義しています。そのソースコードを以下に示しますが、ほとんど先ほど挙げた参考ページのまるパクリなのでそちらをみた方がいいと思います。
def MemoizeFunc(func):
table = {}
print table
def memoized_func(*args):
if not args in table:
table[args] = func(*args)
return table[args]
return memoized_func
にあたる関数オブジェクトも作成したので、それを引数に与えてDPSegmentationを再帰呼び出しして次のステップに移ります。当然、fの添え字番号にあたるiもインクリメントして与えます。
(4)再帰の終着点
i==k-1ということは のところまで来たということになります。ここにきて初めて関数GetMaxが呼ばれます。その時、i==k-1のときのfの戻り値が必要なので、そのfが呼び出されますが、また、GetMax関数があり更にその中でi==k-2のときのfが呼び出され・・・という動作が繰り返され、最終的にはi==1のとき、つまりf_iniで定義した関数が呼び出されるという流れになります。従って、実質的な計算はこの段階に来て初めて行われ、それまでの過程は計算のための準備(関数オブジェクトの作成、メモ化)といえます。
後は、ResultListに各の値を詰めていけば終了です。
時系列データのクラス
上述の通り、SequenceSegmentationに渡すSequenceDataはGetLength()とh(t1,t2)のメソッドが定義されているオブジェクトである必要があります。また、今回は実数値の時系列データを取り扱うので、__init__()でnumpy.array型の実数値配列データを受け取り、これを時系列データの実際の数値とします。ということで時系列データクラスの定義は以下の通りです。
class TimeSeriesData:
def __init__(self , Data):
self.Data = Data
def GetLength(self):
return self.Data.shape[0]
def h(self , t1 , t2):
S = self.Data[t1:t2] - np.mean(self.Data[t1:t2])
return -np.sum(S*S)/(t2 - t1)
スコアh(t1 , t2)は、セグメント内の分散にマイナスをつけたものとしました。これで、このクラスから生成したオブジェクトをSequenceSegmentationに突っ込めば、時系列データをセグメント内の平均値で近似した時に誤差が最も小さくなるようなセグメンテーションが得られます。
実行と結果
以下のように、インタプリタに打ち込んで実行しました。
>>> data = np.sin(np.arange(0, np.pi * 4 ,(np.pi * 4)/150.0)) + 0.25 * np.random.randn(150)
>>> Sequence = TimeSeriesData(data)
>>> T = SequenceSegmentation(Sequence , 5)
データとしては2周期分の正弦波にテキトーなノイズをのせ、150点サンプリングしたものにしました。それを5つのセグメントに分割するように実行しています。ということで結果は以下の通り。元の時系列データの各点を青で示し、各セグメントを平均値で近似した曲線は赤で示しています。黒の線はセグメントの境界です。
大体、元々の正弦波が0と交差する付近に各セグメントの境界が来ています。流石にセグメンテーション数なので、近似という意味ではいまいちかもしれませんが、時系列的な推移の概略は分かるものになっていると思います。
スコアの計算方法をかえるだけで様々なセグメンテーションが可能
先ほどの例では、セグメント内の分散にマイナスをつけたものをセグメント内のスコアとしましたが、別の時系列データクラスを定義し、そのh(t1 , t2)を新たに定義することで、また別のセグメンテーションが得られます。
例えば、以下のコードはセグメントの境界位置にあたる点同士を直線で結び、その直線と実際の点の誤差の二乗平均にマイナスつけたものをスコアにしています。こうすることで、セグメントの境界の点同士を直線補間して得られる曲線が最も元の時系列データを近似するようなセグメントの境界位置を得ることができます。
class TimeSeriesLerp(TimeSeriesData):
def AproximateSubSequence(self , t1 , t2):
if t2 < self.GetLength():
x1 = t1
x2 = t2
else:
x1 = t1
x2 = t2 - 1
a = (self.Data[x2] - self.Data[x1]) / (x2 - x1)
b = (x2 * self.Data[x1] - x1 * self.Data[x2])/(x2 - x1)
return (a * np.arange(t1 , t2) + b)
def h(self , t1 , t2):
S = self.Data[t1:t2] - self.AproximateSubSequence(t1 , t2)
return -np.sum(S*S)/(t2 - t1)
面倒だったので、先ほどのTimeSeriesDataクラスを継承してhをオーバーライドしています。AproximateSubSequenceで直線補間値を得て、誤差の二乗平均を計算します。プログラム上、最後の境界位置をとしていますが、Data[N]はIndexErrorになるので、最後のセグメントだけ、セグメントの始点とN-1の点を結ぶようにするためif文で分岐させています。そしてこれを下記のようにしてセグメンテーションします。
>>> Sequence = TimeSeriesLerp(data)
>>> T = SequenceSegmentation(Sequence , 5)
dataは上述と同じ正弦波にノイズを乗せたものです。そしてセグメンテーションの結果は以下の通り。
平均で近似した時と違い、振幅の最大値・最小値付近に各セグメントの境界が来ています。近似と言う意味では、こちらの線形補間近似の方がより良く近似できていると思います。実際、得られたスコアもこちらの方が高い(つまり、誤差が小さい)です。
まとめ
ラムダ式やメモ化、再帰を利用すれば、かなりすっきりした形でシーケンシャルデータのセグメンテーションを実装できるなと感じました。ただ、これらに対する理解がまだまだ浅いので、無駄な事をしている箇所やもっと効率化できる部分があるかもしれません。そういった点があればご指摘いただければ幸いです。
また、今回は時系列データの近似のみを考えましたが、セグメント毎にスコアが得られるシーケンシャルデータであれば同様のアルゴリズムが適用できるので、なんか面白そうな題材があれば次は、それを試してみたいと思います。