甲斐性なしのブログ

うどんくらいしか食べる気がしない

【Python】動的計画法によるシーケンシャルデータのセグメンテーション

学生の頃、表題のアルゴリズムを研究に応用しようとしていましたが、すっかり忘れているので、思い出すために改めてPythonで実装してみました。当時は、matlabでfor文を回して実現していましたが、今回は、勉強がてらラムダ式やら再帰呼び出しやらを駆使して書いてみようと思います。

シーケンシャルデータのセグメンテーションとは

まず、今回取り上げる問題はどういうものか、等間隔でサンプリングされた離散的な時系列データを例にとって説明します。下図のように、時系列データをk個のセグメント分割すると、それぞれのセグメントごとに何らかのスコアS_iが得られるとします。さて、セグメントの境界をどこに置けば、スコアの合計値J = \sum_{i=1}^{k} S_iを最大化できるのでしょう?というのが今回解きたい問題です。

具体的な応用を1つ挙げると時系列データの近似があります。例えば、下図のようにセグメント内の平均値で時系列データを近似し、その二乗誤差平均が最小になるようなセグメンテーションを求めるというものがあります。

これを、上記のスコア最大化という文脈に当てはめると以下のような評価関数になります。
  \displaystyle J = \sum_{i=1}^{k} S_i =  \sum_{i=1}^{k}\left( - \frac{1}{t - t_{i-1}}\sum_{t = t_{i - 1}}^{t_{i} - 1} \left(f(t) - \mu_i \right)^2 \right)
ここでは評価関数の最大化という問題に当てはめたいので、誤差の二乗平均にマイナス付与したものをスコアとしています。ここでの時系列データは離散的にサンプリングされたデータであり、そのサンプル数は有限個(N個)とするので、t_i0 \leq t_i < t_{i+1} \leq N , t_0 = 0 , t_k = Nである整数です。このJを最大化する\{t_0 , t_1 , \cdots , t_k \}の組み合わせを求めるために、今回取り上げる動的計画法(Dynamic Programming, 以下DP)がよく利用されます。

DPをどのように適用するか

DPは1950年代にBellmanによって考案されたアルゴリズムで、DPを応用した関数のセグメンテーション近似手法も同著者によって提案されています*1。DPの基本的な事項は下記の書籍が参考になります。僕もDPの元論文ではなく、この本を参考にセグメンテーションを実装しています。

これなら分かる最適化数学―基礎原理から計算手法まで

これなら分かる最適化数学―基礎原理から計算手法まで

さて、各セグメントのスコアS_iはセグメントを囲む境界位置t_{i-1},t_iの関数になります。そのため、S_i = h \left(t_{i-1} , t_i \right)とすると解きたい問題は以下のようになります。

\displaystyle \hat{J} = \max_{t_1 , t_2 , \cdots , t_{k-1}}\left(h(0 , t_1) + h(t_1 , t_2) + h(t_2 , t_3) + \cdots + h(t_{k-1} , N)  \right)

{\rm s.t.} \hspace{1em}  t_i \in {\mathbb N}  \hspace{1em} , \hspace{1em} t_{i - 1} < t_i

これは次のような再帰的な部分問題に分割することができます。
f_1(t_1) = h(0 , t_1)
\displaystyle f_2(t_2) = \max_{t_1} \left(f_1(t_1) + h(t_1 , t_2)\right) \hspace{1cm} {\rm s.t.} \hspace{1em} 1 \leq t_1 < t_2
\vdots
 \displaystyle f_i(t_i) = \max_{t_{i-1}} \left(f_i(t_{i-1}) + h(t_{i-1} , t_{i})\right)  \hspace{1cm} {\rm s.t.} \hspace{1em} i - 1 \leq t_{i - 1}  < t_i
\vdots
 \displaystyle \hat{J} = \max_{t_{k - 1}} \left(f_{k - 1}(t_{k - 1}) +  h(t_{k-1} , N)\right) \hspace{1cm} {\rm s.t.} \hspace{1em} k - 2 \leq t_{k - 1}  < N

ここで、f_2(t_2)を求めることが考えてみると、各t_2の値に対して、f_1(t_1) + h(t_1 , t_2)を最大にするt_1を探索しなければなりません。そのため、毎回f_1(t_1)の計算結果が必要になります。その際、f_1(t_1)を予め求めその結果を保持しておけば、計算時間の節約になります。同様にf_3(t_3)を求めるときも、予めf_2(t_2)の結果を保持しておけば、時間的に効率が良いです。このように途中までの計算結果を保持しておき、次のステップで利用するというのはまさに動的計画法の考え方です。
この操作を繰り返していくと、最終的には目的関数の最大値\hat{J}とその時のt_{k-1}が求まります。t_{k-1}が分かれば、f_{k-1}(t_{k-1})を最大にするt_{k-2}は既に計算して保持してあるはずなので、すぐにわかります。同様に遡っていくことで、最適なt_1まで求めることができます。これが動的計画法を利用したシーケンスデータのセグメンテーションアルゴリズムです。

動的計画法を実現する再帰関数

ここから、書いたソースコードを紹介していきます。なお、いつものように

import numpy as np

としています。では、まずセグメンテーションの起点となる関数です。

def SequenceSegmentation(SequenceData , k):
    f_ini = lambda t : (SequenceData.h(0 , t) , 0)
    ResultList = []
    DPSegmentation(SequenceData , k , f_ini , 1 , ResultList)
    ResultList.reverse()
    T = np.zeros(k + 1)
    T[1:k] = np.array(ResultList)[: , 1]
    T[k] = SequenceData.GetLength()
    return T

シーケンシャルデータのオブジェクト(詳しくは後述)とセグメント数kを与え、最適なセグメント境界位置を返します。実際に動的計画法でセグメンテーションを行う部分は、この関数で呼びだしているDPSegmentationという再帰関数です。f_iniは上述のf_1(t_1)にあたる関数オブジェクトです。ただし、上記の数式上ではf_i(t_i)その値を返す関数ですが、プログラムでは値とその時に選ばれたt_{i-1}のタプル(つまり、\max\rm{argmax}のタプル)を返す関数としています。f_iniも同様で、t_{i-1} = t_0 = 0なのでタプルの第2要素を全て0とします。これを初期値としてDPSegmentationに与えると、ResultListに各セグメントの境界位置(実際には、それとf_i(t_i)の値のタプル)がリストで得られるという寸法です。
ということで、その動的計画法再帰関数です。

def DPSegmentation(SequenceData , k , f_old , i , ResultList):
    if i == k - 1:
        N = SequenceData.GetLength()
        Result = GetMax((lambda t_old: f_old(t_old)[0] +
                            SequenceData.h(t_old , N )) , range(i , N ) )   #(4)
        ResultList.append(Result)
        return Result
    g = lambda t: lambda t_old: f_old(t_old)[0] + SequenceData.h(t_old , t) 
    f = lambda t: GetMax(g(t) , range(i , t))                               #(1)
    f = MemoizeFunc(f)                                                      #(2)
    Result = DPSegmentation(SequenceData , k , f , i + 1 , ResultList)      #(3)
    ResultList.append(f(Result[1]))
    return f(Result[1])

第1引数はシーケンシャルデータクラスから生成したオブジェクトです。見て分かる通りシーケンスの長さ(N)を返すメソッドGetLength()と2つの引数(セグメントの境界位置)を受け取りセグメントのスコアを返すh(t1 , t2)が定義されている必要があります。第2引数はセグメント数k、第3引数のf_oldは前ステップで作成した上述の関数f_i(t_i)にあたる関数オブジェクトを入れます。第4引数iはそのf_oldの添え字番号を表し、第5引数のリストに計算結果を詰めていきます。
ソースコードにコメントで番号をふったので、以下その番号に沿って説明していきます。

(1)関数オブジェクトfの作成

ここでは、f(t) = \max_{t_{old}} \left( f_{old}(t_{old}) + h(t , t_{old}) \right)として機能する関数オブジェクトfを作成しています。一行ですぱっと書く方法が分からなかったので、まずg(t , t_{old}) = f_{old}(t_{old}) + h(t , t_{old})を定義しています。ソースコード上でgはカリー化された関数として定義しており、g(a)としたときには、t=aと固定し1つの引数t_oldのみ受け取る新たな関数が返ってくることに注意してください。そして、次にf(t) = \max_{t_{old}} \left( g(t , t_{old}) \right)とするため、g(t)として返ってきた関数をGetMaxに渡しています。
関数GetMaxのソースコードは以下の通りです。これは関数と探索する値のリストを引数に与え、その探索範囲で関数の最大値を見つけるものです。初期化のところでも述べたように、f(t)\max\rm{argmax}のタプルを返すようにしたいので、GetMax最大値と最大にする変数の値のタプルを返すようにしています。

def GetMax(func , SearchVars):
    max_var = SearchVars[0]
    max_value = func(SearchVars[0])
    for var in SearchVars:
        if func(var) >  max_value:
            max_value =  func(var)
            max_var = var
    return max_value , max_var
(2)関数のメモ化

この実装では関数のメモ化というテクニックを利用して、動的計画法の肝となる途中までの計算結果の保持を各f_iにやらせています。メモ化とは1度引数として与えて計算した値は保持しておいて、再度同様の引数が与えられたときに再計算する必要をなくすような関数にすることをいいます。メモ化に関してはM.Hiroiさんのホームページを参考にしました。
本実装では引数で与えた関数をメモ化してくれる関数をMemoizeFuncと定義しています。そのソースコードを以下に示しますが、ほとんど先ほど挙げた参考ページのまるパクリなのでそちらをみた方がいいと思います。

def MemoizeFunc(func):
    table = {}
    print table
    def memoized_func(*args):
        if not args in table:
            table[args] = func(*args)
        return table[args]
    return memoized_func
(3)再帰呼び出し

f(t)にあたる関数オブジェクトも作成したので、それを引数に与えてDPSegmentationを再帰呼び出しして次のステップに移ります。当然、fの添え字番号にあたるiもインクリメントして与えます。

(4)再帰の終着点

i==k-1ということは\hat{J} = \max_{t_{k - 1}} \left(f_{k - 1}(t_{k - 1}) +  h(t_{k-1} , N)\right) のところまで来たということになります。ここにきて初めて関数GetMaxが呼ばれます。その時、i==k-1のときのfの戻り値が必要なので、そのfが呼び出されますが、また、GetMax関数があり更にその中でi==k-2のときのfが呼び出され・・・という動作が繰り返され、最終的にはi==1のとき、つまりf_iniで定義した関数が呼び出されるという流れになります。従って、実質的な計算はこの段階に来て初めて行われ、それまでの過程は計算のための準備(関数オブジェクトの作成、メモ化)といえます。
後は、ResultListに各f(t)の値を詰めていけば終了です。

時系列データのクラス

上述の通り、SequenceSegmentationに渡すSequenceDataはGetLength()とh(t1,t2)のメソッドが定義されているオブジェクトである必要があります。また、今回は実数値の時系列データを取り扱うので、__init__()でnumpy.array型の実数値配列データを受け取り、これを時系列データの実際の数値とします。ということで時系列データクラスの定義は以下の通りです。

class TimeSeriesData:
    def __init__(self , Data):
        self.Data = Data

    def GetLength(self):
        return self.Data.shape[0]

    def h(self , t1 , t2):
        S = self.Data[t1:t2] - np.mean(self.Data[t1:t2])
        return  -np.sum(S*S)/(t2 - t1)

スコアh(t1 , t2)は、セグメント内の分散にマイナスをつけたものとしました。これで、このクラスから生成したオブジェクトをSequenceSegmentationに突っ込めば、時系列データをセグメント内の平均値で近似した時に誤差が最も小さくなるようなセグメンテーションが得られます。

実行と結果

以下のように、インタプリタに打ち込んで実行しました。

>>> data = np.sin(np.arange(0, np.pi * 4 ,(np.pi * 4)/150.0)) + 0.25 * np.random.randn(150)
>>> Sequence = TimeSeriesData(data)
>>> T = SequenceSegmentation(Sequence , 5)

データとしては2周期分の正弦波にテキトーなノイズをのせ、150点サンプリングしたものにしました。それを5つのセグメントに分割するように実行しています。ということで結果は以下の通り。元の時系列データの各点を青で示し、各セグメントを平均値で近似した曲線は赤で示しています。黒の線はセグメントの境界です。

大体、元々の正弦波が0と交差する付近に各セグメントの境界が来ています。流石にセグメンテーション数k=5なので、近似という意味ではいまいちかもしれませんが、時系列的な推移の概略は分かるものになっていると思います。

スコアの計算方法をかえるだけで様々なセグメンテーションが可能

先ほどの例では、セグメント内の分散にマイナスをつけたものをセグメント内のスコアとしましたが、別の時系列データクラスを定義し、そのh(t1 , t2)を新たに定義することで、また別のセグメンテーションが得られます。
例えば、以下のコードはセグメントの境界位置にあたる点同士を直線で結び、その直線と実際の点の誤差の二乗平均にマイナスつけたものをスコアにしています。こうすることで、セグメントの境界の点同士を直線補間して得られる曲線が最も元の時系列データを近似するようなセグメントの境界位置を得ることができます。

class TimeSeriesLerp(TimeSeriesData):

    def AproximateSubSequence(self , t1 , t2):
        if t2 < self.GetLength():
            x1 = t1
            x2 = t2
        else:
            x1 = t1
            x2 = t2 - 1
        a = (self.Data[x2] - self.Data[x1]) / (x2 - x1)
        b = (x2 * self.Data[x1] - x1 * self.Data[x2])/(x2 - x1)
        return (a * np.arange(t1 , t2) + b)

    def h(self , t1 , t2):
        S = self.Data[t1:t2] - self.AproximateSubSequence(t1 , t2)
        return  -np.sum(S*S)/(t2 - t1)

面倒だったので、先ほどのTimeSeriesDataクラスを継承してhをオーバーライドしています。AproximateSubSequenceで直線補間値を得て、誤差の二乗平均を計算します。プログラム上、最後の境界位置をt_k = Nとしていますが、Data[N]はIndexErrorになるので、最後のセグメントだけ、セグメントの始点とN-1の点を結ぶようにするためif文で分岐させています。そしてこれを下記のようにしてセグメンテーションします。

>>> Sequence = TimeSeriesLerp(data)
>>> T = SequenceSegmentation(Sequence , 5)

dataは上述と同じ正弦波にノイズを乗せたものです。そしてセグメンテーションの結果は以下の通り。

平均で近似した時と違い、振幅の最大値・最小値付近に各セグメントの境界が来ています。近似と言う意味では、こちらの線形補間近似の方がより良く近似できていると思います。実際、得られたスコアもこちらの方が高い(つまり、誤差が小さい)です。

まとめ

ラムダ式やメモ化、再帰を利用すれば、かなりすっきりした形でシーケンシャルデータのセグメンテーションを実装できるなと感じました。ただ、これらに対する理解がまだまだ浅いので、無駄な事をしている箇所やもっと効率化できる部分があるかもしれません。そういった点があればご指摘いただければ幸いです。
また、今回は時系列データの近似のみを考えましたが、セグメント毎にスコアが得られるシーケンシャルデータであれば同様のアルゴリズムが適用できるので、なんか面白そうな題材があれば次は、それを試してみたいと思います。

*1:On the Approximation of Curves by Line Segments Using Dynamic Programming [R.Bellman , 1961]