Введение
В прошлом посте мы обсуждали расширенный фильтр Калмана, в этом посте мы рассмотрим неароматизированный фильтр Калмана (UKF) — еще один метод, когда модель наблюдения и/или перехода нелинейна.
Метод называется неароматизированным, потому что применяется неароматизированное преобразование. Когда система нелинейна, мы могли бы взять набор точек сигмы вокруг априора и с определенным отношением к стандартному отклонению применить преобразование и получить набор точек, принадлежащих пространству модели. Затем мы берем среднее значение и дисперсию набора точек и используем их в качестве приближения для истинного среднего и истинной дисперсии системы. Чтобы убедиться, что ваш код работает правильно, спроектируйте линейную систему и примените как UKF, так и KF. У них должен быть один и тот же ответ.
Есть три шага для UKF. Более подробную информацию об UKF можно найти здесь
- Генерация сигма-баллов
- Как предсказать точки сигмы
- Вычислите прогнозируемое среднее значение и ковариацию по прогнозируемым сигма-точкам
Шаг 1: Создайте баллы Sigma
Количество сигма-точек зависит от количества состояний. (размер X в стандартной форме)
две точки для каждого состояния (распределенные по разным направлениям) и дополнительная точка для среднего значения состояний.
значение точек сигмы зависит от предыдущего значения и параметра масштабирования, который определяет отношение к ошибочному затмению (расстояние от среднего).
Обычно мы принимаем каппа за 0, а альфа за небольшую переменную.
Преобразование без запаха закодировано ниже
def unscented_transform(scaling_factor, init_x, init_P): n_state = init_x.shape[0] sigma_point_matrix = np.zeros((n_state, n_state*2+1)) sqrt_P = sqrtm((n_state + scaling_factor) * init_P) sigma_point_matrix[:,0] = init_x.flatten() sigma_point_matrix[:,1:n_state+1] = init_x + sqrt_P sigma_point_matrix[:,n_state+1:] = init_x - sqrt_P return sigma_point_matrix
Фильтр Калмана без запаха просто расширяет исходное состояние с помощью шумовых терминов Q, R. Ниже приведен пример реализации. Родительский класс является исходным объектом фильтра Калмана.
class unscented_kalman_filter(kalman_filter): def __init__(self, x_init, F, process_noise, observation_noise, H, obs_dim, B=None, u=None, sd=np.array([[0]])): super().__init__(x_init, F, process_noise, observation_noise, H, B, u, sd) self.x_dim = x_init.shape[0] self.Q_shape = process_noise.shape[0] self.R_shape = observation_noise.shape[0] self.aug_dim = self.x_dim + self.Q_shape, self.R_shape self.x_aug = np.concatenate([x_init, np.zeros((process_noise.shape[0],1))]) self.P_aug = np.eye(self.x_aug.shape[0]) self.P_aug[:self.x_dim,:self.x_dim] = self.P_post self.P_aug[self.x_dim:self.x_dim+self.Q_shape,self.x_dim:self.x_dim+self.Q_shape] = process_noise self.P_aug[self.x_dim+self.Q_shape:, self.x_dim+self.Q_shape:] = observation_noise self.scaling_factor = get_scaling_factor(self.aug_dim, kappa=1) self.weights = get_weights(self.x_dim, self.scaling_factor) self.chi_limit = chi2.ppf(0.95, obs_dim) def _update_aug(self): self.x_aug[:self.x_dim] = self.X_post self.P_aug[:self.x_dim,:self.x_dim] = self.P_post def fit(self, data, num_state=1, fit_step=None): self.num_state = num_state fit_step = fit_step if fit_step is not None and fit_step < data.shape[1] else data.shape[1] if self.u.shape[0] == 1: self.u = np.zeros((fit_step, 1)) for i in range(fit_step): z_k = data[:, i] self._update_aug() self.sigma_points = unscented_transform(self.x_aug, self.P_aug, self.scaling_factor) # predict self._predict(self.X_post, self.P_post) self._update(z_k) # update self.mean = np.append(self.mean, self.X_post) self.covar = np.append(self.covar, self.P_post) def _predict(self, X_post, P_post): self.transformed_sigma = np.apply_along_axis(self.F, 0, self.sigma_points['sigma_points']) self.X_prior = np.dot(self.weights, self.transformed_sigma) self.P_prior = np.dot(self.weights, np.dot((self.transformed_sigma - self.X_prior),(self.transformed_sigma - self.X_prior).T)) def _update(self, observation): transformed_observation = np.apply_along_axis(self.H, 0, self.transformed_sigma) resid = observation - np.dot(self.weights, transformed_observation) temp = transformed_observation - np.dot(self.weights, transformed_observation) S_k = np.dot(temp, temp.T) + self.R cross_correlation = np.dot((self.transformed_sigma - self.X_prior), temp.T) K_k = np.dot(np.dot(self.weights, cross_correlation), np.linalg.inv(S_k)) self.X_post = self.X_prior + np.dot(K_k, resid) self.P_post = self.P_prior - np.dot(np.dot(K_k, S_k), K_k.T) def normalized_innovation_squared(self, P_prior, S, P_post): temp = P_prior - P_post return np.dot(np.dot(temp.T, np.linalg.inv(S)), temp)
И в FilterPy, и в PyKalman реализован класс unscented, поэтому нет необходимости писать его с нуля.
С той же настройкой модели, что и в предыдущем посте здесь. Мы получаем следующий результат, используя фильтр Калмана без запаха. Как и раньше, первые 10 итераций удаляются.
result_set = {} graph_text = "" x_init = np.ones((4, 1)) def x_new(inputs, *args): return_mat = np.zeros_like(inputs) return_mat[0] = inputs[0] return_mat[1] = -inputs[1]*np.cos(inputs[2]) - inputs[3] * np.sin(inputs[2]) return_mat[2] = inputs[2] return_mat[3] = inputs[3] return return_mat def y_new(inputs, *args): return inputs[0] + inputs[1]*inputs[0] def F_partial(inputs): return_mat = np.eye(inputs.shape[0]) return_mat[1,1:] = [-np.cos(inputs[2]), inputs[1]*np.sin(inputs[2])-inputs[3]*np.cos(inputs[2]), -np.sin(inputs[2])] return return_mat def H_partial(inputs): return np.array([inputs[1,0],inputs[0,0],0,0]).reshape(1,4) def L_partial(inputs): return np.eye(inputs.shape[0]) def M_partial(inputs): return 1 Q = np.eye(4) R = 1 kf = extend_kalman_filter(x_init, x_new, y_new, F_partial, Q, R, H_partial, L_partial, M_partial , B=None, u=None, sd=Q) n_train = int(len(data) * 0.9) n_test = len(data) - n_train kf.fit(observation[:,:n_train], 4, n_train) fittedvalues = kf.get_observation() pred_ = kf.predict(n_test) result_set['KF Extend'] = np.append(fittedvalues, pred_) graph_text += "\nKF Extend rmse: " + "{:.4f}".format(rmse(observation[0,n_train:], pred_)) def transition_function(state, noise=[0, 0]): return_mat = np.zeros_like(state) return_mat[0] = state[0] + noise[0] return_mat[1] = -state[1] * np.cos(state[2]) - state[3] * np.sin(state[2]) return_mat[2] = state[2] return_mat[3] = state[3] return return_mat def observation_function(state, noise=[0]): return state[0] + state[1]*state[0] + noise # sample from model kf = UnscentedKalmanFilter( transition_function, observation_function, Q, R, x_init.flatten() ) filtered_x = kf.filter(observation[:,:n_train])[0] filtered_obs = np.apply_along_axis(observation_function, 1, filtered_x) filtered_pred = forecast_kf(n_test, filtered_x[-1], transition_function, observation_function, 1) result_set['KF Unscented'] = np.append(filtered_obs, filtered_pred) graph_text += "\nKF Unscented: " + "{:.4f}".format(rmse(observation[0,n_train:], filtered_pred)) plot_comparison(truth=observation.reshape(-1,1), observation=[n_train], model_result=result_set , total_len=n_test + n_train, title="SPY Comparison", file_name='SPY_unscented.png', x=data.index, additional_text=graph_text,init_pos=10) plot_comparison(truth=observation.reshape(-1, 1), observation=[n_train], model_result=result_set , total_len=n_test + n_train, title="SPY Comparison", file_name='SPY_unscented_test.png', x=data.index, additional_text=graph_text, init_pos=n_train)
Сосредоточив внимание на периоде прогнозирования, неароматизированный производит прогноз, который выше, чем продление. В нескольких временных шагах есть небольшое изменение (незаметное), но оно быстро становится постоянным предсказанием, подобным расширенному фильтру.
На этом посты, в которых говорится о фильтре Калмана как об отдельном инструменте, заканчиваются. В следующем посте мы поговорим о том, как использовать фильтр Калмана в качестве инструмента обнаружения выбросов.
Спасибо за чтение. Пожалуйста, следите и подпишитесь, если вам нравятся эти посты.