Введение

В прошлом посте мы обсуждали расширенный фильтр Калмана, в этом посте мы рассмотрим неароматизированный фильтр Калмана (UKF) — еще один метод, когда модель наблюдения и/или перехода нелинейна.

Метод называется неароматизированным, потому что применяется неароматизированное преобразование. Когда система нелинейна, мы могли бы взять набор точек сигмы вокруг априора и с определенным отношением к стандартному отклонению применить преобразование и получить набор точек, принадлежащих пространству модели. Затем мы берем среднее значение и дисперсию набора точек и используем их в качестве приближения для истинного среднего и истинной дисперсии системы. Чтобы убедиться, что ваш код работает правильно, спроектируйте линейную систему и примените как UKF, так и KF. У них должен быть один и тот же ответ.

Есть три шага для UKF. Более подробную информацию об UKF можно найти здесь

  1. Генерация сигма-баллов
  2. Как предсказать точки сигмы
  3. Вычислите прогнозируемое среднее значение и ковариацию по прогнозируемым сигма-точкам

Шаг 1: Создайте баллы Sigma

Количество сигма-точек зависит от количества состояний. (размер X в стандартной форме)

две точки для каждого состояния (распределенные по разным направлениям) и дополнительная точка для среднего значения состояний.

значение точек сигмы зависит от предыдущего значения и параметра масштабирования, который определяет отношение к ошибочному затмению (расстояние от среднего).

Обычно мы принимаем каппа за 0, а альфа за небольшую переменную.

Преобразование без запаха закодировано ниже

def unscented_transform(scaling_factor, init_x, init_P):
    n_state = init_x.shape[0]
    sigma_point_matrix = np.zeros((n_state, n_state*2+1))
    sqrt_P = sqrtm((n_state + scaling_factor) * init_P)
    sigma_point_matrix[:,0] = init_x.flatten()
    sigma_point_matrix[:,1:n_state+1] = init_x + sqrt_P
    sigma_point_matrix[:,n_state+1:] = init_x - sqrt_P
    return sigma_point_matrix

Фильтр Калмана без запаха просто расширяет исходное состояние с помощью шумовых терминов Q, R. Ниже приведен пример реализации. Родительский класс является исходным объектом фильтра Калмана.

class unscented_kalman_filter(kalman_filter):
    def __init__(self, x_init, F, process_noise, observation_noise, H, obs_dim, B=None, u=None, sd=np.array([[0]])):
        super().__init__(x_init, F, process_noise, observation_noise, H, B, u, sd)
        self.x_dim = x_init.shape[0]
        self.Q_shape = process_noise.shape[0]
        self.R_shape = observation_noise.shape[0]
        self.aug_dim = self.x_dim + self.Q_shape, self.R_shape
        self.x_aug = np.concatenate([x_init, np.zeros((process_noise.shape[0],1))])
        self.P_aug = np.eye(self.x_aug.shape[0])
        self.P_aug[:self.x_dim,:self.x_dim] = self.P_post
        self.P_aug[self.x_dim:self.x_dim+self.Q_shape,self.x_dim:self.x_dim+self.Q_shape] = process_noise
        self.P_aug[self.x_dim+self.Q_shape:, self.x_dim+self.Q_shape:] = observation_noise
        self.scaling_factor = get_scaling_factor(self.aug_dim, kappa=1)
        self.weights = get_weights(self.x_dim, self.scaling_factor)
        self.chi_limit = chi2.ppf(0.95, obs_dim)


    def _update_aug(self):
        self.x_aug[:self.x_dim] = self.X_post
        self.P_aug[:self.x_dim,:self.x_dim] = self.P_post


    def fit(self, data, num_state=1, fit_step=None):
        self.num_state = num_state
        fit_step = fit_step if fit_step is not None and fit_step < data.shape[1] else data.shape[1]
        if self.u.shape[0] == 1:
            self.u = np.zeros((fit_step, 1))

        for i in range(fit_step):
            z_k = data[:, i]
            self._update_aug()
            self.sigma_points = unscented_transform(self.x_aug, self.P_aug, self.scaling_factor)
            # predict
            self._predict(self.X_post, self.P_post)
            self._update(z_k)
            # update
            self.mean = np.append(self.mean, self.X_post)
            self.covar = np.append(self.covar, self.P_post)

    def _predict(self, X_post, P_post):
        self.transformed_sigma = np.apply_along_axis(self.F, 0, self.sigma_points['sigma_points'])
        self.X_prior = np.dot(self.weights, self.transformed_sigma)
        self.P_prior = np.dot(self.weights, np.dot((self.transformed_sigma - self.X_prior),(self.transformed_sigma - self.X_prior).T))

    def _update(self, observation):
        transformed_observation = np.apply_along_axis(self.H, 0, self.transformed_sigma)
        resid = observation - np.dot(self.weights, transformed_observation)
        temp = transformed_observation - np.dot(self.weights, transformed_observation)
        S_k = np.dot(temp, temp.T) + self.R
        cross_correlation = np.dot((self.transformed_sigma - self.X_prior), temp.T)
        K_k = np.dot(np.dot(self.weights, cross_correlation), np.linalg.inv(S_k))
        self.X_post = self.X_prior + np.dot(K_k, resid)
        self.P_post = self.P_prior - np.dot(np.dot(K_k, S_k), K_k.T)


    def normalized_innovation_squared(self, P_prior, S, P_post):
        temp = P_prior - P_post
        return np.dot(np.dot(temp.T, np.linalg.inv(S)), temp)

И в FilterPy, и в PyKalman реализован класс unscented, поэтому нет необходимости писать его с нуля.

С той же настройкой модели, что и в предыдущем посте здесь. Мы получаем следующий результат, используя фильтр Калмана без запаха. Как и раньше, первые 10 итераций удаляются.

    result_set = {}
    graph_text = ""
    x_init = np.ones((4, 1))
    def x_new(inputs, *args):
        return_mat = np.zeros_like(inputs)
        return_mat[0] = inputs[0]
        return_mat[1] = -inputs[1]*np.cos(inputs[2]) - inputs[3] * np.sin(inputs[2])
        return_mat[2] = inputs[2]
        return_mat[3] = inputs[3]
        return return_mat

    def y_new(inputs, *args):
        return inputs[0] + inputs[1]*inputs[0]

    def F_partial(inputs):
        return_mat = np.eye(inputs.shape[0])
        return_mat[1,1:] = [-np.cos(inputs[2]), inputs[1]*np.sin(inputs[2])-inputs[3]*np.cos(inputs[2]),
                            -np.sin(inputs[2])]
        return return_mat

    def H_partial(inputs):
        return np.array([inputs[1,0],inputs[0,0],0,0]).reshape(1,4)


    def L_partial(inputs):
        return np.eye(inputs.shape[0])

    def M_partial(inputs):
        return 1

    Q = np.eye(4)
    R = 1
    kf = extend_kalman_filter(x_init, x_new, y_new, F_partial, Q, R, H_partial,
                 L_partial, M_partial
                 , B=None, u=None,  sd=Q)


    n_train = int(len(data) * 0.9)
    n_test = len(data) - n_train
    kf.fit(observation[:,:n_train], 4, n_train)
    fittedvalues = kf.get_observation()
    pred_ = kf.predict(n_test)
    result_set['KF Extend'] = np.append(fittedvalues, pred_)
    graph_text += "\nKF Extend rmse: " + "{:.4f}".format(rmse(observation[0,n_train:], pred_))


    def transition_function(state, noise=[0, 0]):
        return_mat = np.zeros_like(state)
        return_mat[0] = state[0] + noise[0]
        return_mat[1] = -state[1] * np.cos(state[2]) - state[3] * np.sin(state[2]) 
        return_mat[2] = state[2]
        return_mat[3] = state[3]
        return return_mat


    def observation_function(state, noise=[0]):
        return state[0] + state[1]*state[0] + noise


    # sample from model
    kf = UnscentedKalmanFilter(
        transition_function, observation_function,
        Q, R, x_init.flatten()
    )

    filtered_x = kf.filter(observation[:,:n_train])[0]
    filtered_obs = np.apply_along_axis(observation_function, 1, filtered_x)

    filtered_pred = forecast_kf(n_test, filtered_x[-1], transition_function, observation_function, 1)

    result_set['KF Unscented'] = np.append(filtered_obs, filtered_pred)
    graph_text += "\nKF Unscented: " + "{:.4f}".format(rmse(observation[0,n_train:], filtered_pred))

    plot_comparison(truth=observation.reshape(-1,1), observation=[n_train], model_result=result_set
                    , total_len=n_test + n_train, title="SPY Comparison", file_name='SPY_unscented.png', x=data.index,
                    additional_text=graph_text,init_pos=10)
    plot_comparison(truth=observation.reshape(-1, 1), observation=[n_train], model_result=result_set
                    , total_len=n_test + n_train, title="SPY Comparison", file_name='SPY_unscented_test.png', x=data.index,
                    additional_text=graph_text, init_pos=n_train)

Сосредоточив внимание на периоде прогнозирования, неароматизированный производит прогноз, который выше, чем продление. В нескольких временных шагах есть небольшое изменение (незаметное), но оно быстро становится постоянным предсказанием, подобным расширенному фильтру.

На этом посты, в которых говорится о фильтре Калмана как об отдельном инструменте, заканчиваются. В следующем посте мы поговорим о том, как использовать фильтр Калмана в качестве инструмента обнаружения выбросов.

Спасибо за чтение. Пожалуйста, следите и подпишитесь, если вам нравятся эти посты.