学生の頃、表題のアルゴリズムを研究に応用しようとしていましたが、すっかり忘れているので、思い出すために改めてPythonで実装してみました。当時は、matlabでfor文を回して実現していましたが、今回は、勉強がてらラムダ式やら再帰呼び出しやらを駆使して書いてみようと思います。
シーケンシャルデータのセグメンテーションとは
まず、今回取り上げる問題はどういうものか、等間隔でサンプリングされた離散的な時系列データを例にとって説明します。下図のように、時系列データを個のセグメント分割すると、それぞれのセグメントごとに何らかのスコアが得られるとします。さて、セグメントの境界をどこに置けば、スコアの合計値を最大化できるのでしょう?というのが今回解きたい問題です。
具体的な応用を1つ挙げると時系列データの近似があります。例えば、下図のようにセグメント内の平均値で時系列データを近似し、その二乗誤差平均が最小になるようなセグメンテーションを求めるというものがあります。
これを、上記のスコア最大化という文脈に当てはめると以下のような評価関数になります。
ここでは評価関数の最大化という問題に当てはめたいので、誤差の二乗平均にマイナス付与したものをスコアとしています。ここでの時系列データは離散的にサンプリングされたデータであり、そのサンプル数は有限個(個)とするので、はである整数です。このを最大化するの組み合わせを求めるために、今回取り上げる動的計画法(Dynamic Programming, 以下DP)がよく利用されます。
DPをどのように適用するか
DPは1950年代にBellmanによって考案されたアルゴリズムで、DPを応用した関数のセグメンテーション近似手法も同著者によって提案されています*1。DPの基本的な事項は下記の書籍が参考になります。僕もDPの元論文ではなく、この本を参考にセグメンテーションを実装しています。
- 作者: 金谷健一
- 出版社/メーカー: 共立出版
- 発売日: 2005/09/01
- メディア: 単行本
- 購入: 29人 クリック: 424回
- この商品を含むブログ (42件) を見る
さて、各セグメントのスコアはセグメントを囲む境界位置の関数になります。そのため、とすると解きたい問題は以下のようになります。
これは次のような再帰的な部分問題に分割することができます。
ここで、を求めることが考えてみると、各の値に対して、を最大にするを探索しなければなりません。そのため、毎回の計算結果が必要になります。その際、を予め求めその結果を保持しておけば、計算時間の節約になります。同様にを求めるときも、予めの結果を保持しておけば、時間的に効率が良いです。このように途中までの計算結果を保持しておき、次のステップで利用するというのはまさに動的計画法の考え方です。
この操作を繰り返していくと、最終的には目的関数の最大値とその時のが求まります。が分かれば、を最大にするは既に計算して保持してあるはずなので、すぐにわかります。同様に遡っていくことで、最適なまで求めることができます。これが動的計画法を利用したシーケンスデータのセグメンテーションアルゴリズムです。
動的計画法を実現する再帰関数
ここから、書いたソースコードを紹介していきます。なお、いつものように
import numpy as np
としています。では、まずセグメンテーションの起点となる関数です。
def SequenceSegmentation(SequenceData , k): f_ini = lambda t : (SequenceData.h(0 , t) , 0) ResultList = [] DPSegmentation(SequenceData , k , f_ini , 1 , ResultList) ResultList.reverse() T = np.zeros(k + 1) T[1:k] = np.array(ResultList)[: , 1] T[k] = SequenceData.GetLength() return T
シーケンシャルデータのオブジェクト(詳しくは後述)とセグメント数を与え、最適なセグメント境界位置を返します。実際に動的計画法でセグメンテーションを行う部分は、この関数で呼びだしているDPSegmentationという再帰関数です。f_iniは上述のにあたる関数オブジェクトです。ただし、上記の数式上ではその値を返す関数ですが、プログラムでは値とその時に選ばれたのタプル(つまり、とのタプル)を返す関数としています。f_iniも同様で、なのでタプルの第2要素を全て0とします。これを初期値としてDPSegmentationに与えると、ResultListに各セグメントの境界位置(実際には、それとの値のタプル)がリストで得られるという寸法です。
ということで、その動的計画法の再帰関数です。
def DPSegmentation(SequenceData , k , f_old , i , ResultList): if i == k - 1: N = SequenceData.GetLength() Result = GetMax((lambda t_old: f_old(t_old)[0] + SequenceData.h(t_old , N )) , range(i , N ) ) #(4) ResultList.append(Result) return Result g = lambda t: lambda t_old: f_old(t_old)[0] + SequenceData.h(t_old , t) f = lambda t: GetMax(g(t) , range(i , t)) #(1) f = MemoizeFunc(f) #(2) Result = DPSegmentation(SequenceData , k , f , i + 1 , ResultList) #(3) ResultList.append(f(Result[1])) return f(Result[1])
第1引数はシーケンシャルデータクラスから生成したオブジェクトです。見て分かる通りシーケンスの長さ()を返すメソッドGetLength()と2つの引数(セグメントの境界位置)を受け取りセグメントのスコアを返すh(t1 , t2)が定義されている必要があります。第2引数はセグメント数、第3引数のf_oldは前ステップで作成した上述の関数にあたる関数オブジェクトを入れます。第4引数iはそのf_oldの添え字番号を表し、第5引数のリストに計算結果を詰めていきます。
ソースコードにコメントで番号をふったので、以下その番号に沿って説明していきます。
(1)関数オブジェクトfの作成
ここでは、として機能する関数オブジェクトfを作成しています。一行ですぱっと書く方法が分からなかったので、まずを定義しています。ソースコード上でgはカリー化された関数として定義しており、g(a)としたときには、t=aと固定し1つの引数t_oldのみ受け取る新たな関数が返ってくることに注意してください。そして、次にとするため、g(t)として返ってきた関数をGetMaxに渡しています。
関数GetMaxのソースコードは以下の通りです。これは関数と探索する値のリストを引数に与え、その探索範囲で関数の最大値を見つけるものです。初期化のところでも述べたように、はとのタプルを返すようにしたいので、GetMax最大値と最大にする変数の値のタプルを返すようにしています。
def GetMax(func , SearchVars): max_var = SearchVars[0] max_value = func(SearchVars[0]) for var in SearchVars: if func(var) > max_value: max_value = func(var) max_var = var return max_value , max_var
(2)関数のメモ化
この実装では関数のメモ化というテクニックを利用して、動的計画法の肝となる途中までの計算結果の保持を各にやらせています。メモ化とは1度引数として与えて計算した値は保持しておいて、再度同様の引数が与えられたときに再計算する必要をなくすような関数にすることをいいます。メモ化に関してはM.Hiroiさんのホームページを参考にしました。
本実装では引数で与えた関数をメモ化してくれる関数をMemoizeFuncと定義しています。そのソースコードを以下に示しますが、ほとんど先ほど挙げた参考ページのまるパクリなのでそちらをみた方がいいと思います。
def MemoizeFunc(func): table = {} print table def memoized_func(*args): if not args in table: table[args] = func(*args) return table[args] return memoized_func
時系列データのクラス
上述の通り、SequenceSegmentationに渡すSequenceDataはGetLength()とh(t1,t2)のメソッドが定義されているオブジェクトである必要があります。また、今回は実数値の時系列データを取り扱うので、__init__()でnumpy.array型の実数値配列データを受け取り、これを時系列データの実際の数値とします。ということで時系列データクラスの定義は以下の通りです。
class TimeSeriesData: def __init__(self , Data): self.Data = Data def GetLength(self): return self.Data.shape[0] def h(self , t1 , t2): S = self.Data[t1:t2] - np.mean(self.Data[t1:t2]) return -np.sum(S*S)/(t2 - t1)
スコアh(t1 , t2)は、セグメント内の分散にマイナスをつけたものとしました。これで、このクラスから生成したオブジェクトをSequenceSegmentationに突っ込めば、時系列データをセグメント内の平均値で近似した時に誤差が最も小さくなるようなセグメンテーションが得られます。
実行と結果
以下のように、インタプリタに打ち込んで実行しました。
>>> data = np.sin(np.arange(0, np.pi * 4 ,(np.pi * 4)/150.0)) + 0.25 * np.random.randn(150) >>> Sequence = TimeSeriesData(data) >>> T = SequenceSegmentation(Sequence , 5)
データとしては2周期分の正弦波にテキトーなノイズをのせ、150点サンプリングしたものにしました。それを5つのセグメントに分割するように実行しています。ということで結果は以下の通り。元の時系列データの各点を青で示し、各セグメントを平均値で近似した曲線は赤で示しています。黒の線はセグメントの境界です。
大体、元々の正弦波が0と交差する付近に各セグメントの境界が来ています。流石にセグメンテーション数なので、近似という意味ではいまいちかもしれませんが、時系列的な推移の概略は分かるものになっていると思います。
スコアの計算方法をかえるだけで様々なセグメンテーションが可能
先ほどの例では、セグメント内の分散にマイナスをつけたものをセグメント内のスコアとしましたが、別の時系列データクラスを定義し、そのh(t1 , t2)を新たに定義することで、また別のセグメンテーションが得られます。
例えば、以下のコードはセグメントの境界位置にあたる点同士を直線で結び、その直線と実際の点の誤差の二乗平均にマイナスつけたものをスコアにしています。こうすることで、セグメントの境界の点同士を直線補間して得られる曲線が最も元の時系列データを近似するようなセグメントの境界位置を得ることができます。
class TimeSeriesLerp(TimeSeriesData): def AproximateSubSequence(self , t1 , t2): if t2 < self.GetLength(): x1 = t1 x2 = t2 else: x1 = t1 x2 = t2 - 1 a = (self.Data[x2] - self.Data[x1]) / (x2 - x1) b = (x2 * self.Data[x1] - x1 * self.Data[x2])/(x2 - x1) return (a * np.arange(t1 , t2) + b) def h(self , t1 , t2): S = self.Data[t1:t2] - self.AproximateSubSequence(t1 , t2) return -np.sum(S*S)/(t2 - t1)
面倒だったので、先ほどのTimeSeriesDataクラスを継承してhをオーバーライドしています。AproximateSubSequenceで直線補間値を得て、誤差の二乗平均を計算します。プログラム上、最後の境界位置をとしていますが、Data[N]はIndexErrorになるので、最後のセグメントだけ、セグメントの始点とN-1の点を結ぶようにするためif文で分岐させています。そしてこれを下記のようにしてセグメンテーションします。
>>> Sequence = TimeSeriesLerp(data)
>>> T = SequenceSegmentation(Sequence , 5)
dataは上述と同じ正弦波にノイズを乗せたものです。そしてセグメンテーションの結果は以下の通り。
平均で近似した時と違い、振幅の最大値・最小値付近に各セグメントの境界が来ています。近似と言う意味では、こちらの線形補間近似の方がより良く近似できていると思います。実際、得られたスコアもこちらの方が高い(つまり、誤差が小さい)です。
まとめ
ラムダ式やメモ化、再帰を利用すれば、かなりすっきりした形でシーケンシャルデータのセグメンテーションを実装できるなと感じました。ただ、これらに対する理解がまだまだ浅いので、無駄な事をしている箇所やもっと効率化できる部分があるかもしれません。そういった点があればご指摘いただければ幸いです。
また、今回は時系列データの近似のみを考えましたが、セグメント毎にスコアが得られるシーケンシャルデータであれば同様のアルゴリズムが適用できるので、なんか面白そうな題材があれば次は、それを試してみたいと思います。
*1:On the Approximation of Curves by Line Segments Using Dynamic Programming [R.Bellman , 1961]