Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/kalman_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ def run_example():
# Define the initial state to be slightly off of actual
x_guess = {'x': 1.75, 'v': 35} # Guess of initial state, actual is {'x': 1.83, 'v': 40}
kf = KalmanFilter(m, x_guess)
# You can also change the process and measurement noise for the filter by specifying them in the constructor.
# This is used for the case where you want to apply different noise in the filter
# By default, they are set to the model noise.

# Step 3: Run the Kalman Filter State Estimator
# Here we're using simulated data from the thrown_object. In a real application you would be using sensor data from the system
Expand Down
50 changes: 44 additions & 6 deletions src/prog_algs/state_estimators/kalman_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ class KalmanFilter(state_estimator.StateEstimator):
Starting time (s)
dt : float
time step (s)
Q : List[List[float]]
Process Noise Matrix
R : List[List[float]]
Measurement Noise Matrix
process_noise : 2darray[float]
Process Noise Matrix (n_states, n_states) - default model.process_noise
measurement_noise : 2darray[float]
Measurement Noise Matrix (n_measurements x n_measurements) - default model.measurement_noise

Note:
The Kalman Filter does not support a custom measurement function
Expand All @@ -46,17 +46,55 @@ def __init__(self, model, x0, measurement_eqn : Callable = None, **kwargs):
super().__init__(model, x0, None, **kwargs)

self.x0 = x0

if 'Q' not in self.parameters:
self.parameters['Q'] = np.diag([1.0e-3 for i in x0.keys()])
if 'R' not in self.parameters:
# Size of what's being measured (not output)
# This is determined by running the measure function on the first state
self.parameters['R'] = np.diag([1.0e-3 for i in range(model.n_outputs)])

num_states = len(x0.keys())
num_states = model.n_states
num_inputs = model.n_inputs + 1
num_measurements = model.n_outputs

# Process Noise (Q)
# Users can use process_noise (like in prog_models) or Q (like in filterpy). They're synced.
if 'Q' in self.parameters:
self.parameters['process_noise'] = self.parameters['Q']

if 'process_noise' not in self.parameters:
if 'process_noise' in model.parameters:
self.parameters['process_noise'] = np.diag([model.parameters['process_noise'][key] for key in x0.keys()])
else:
self.parameters['process_noise'] = np.diag([1.0e-3 for _ in range(num_states)])
else:
# Manage type
if isinstance(self.parameters['process_noise'], list):
self.parameters['process_noise'] = np.array(self.parameters['process_noise'])

# Check size
if self.parameters['process_noise'].shape != (num_states, num_states):
raise Exception('process_noise must be a square matrix with size equal to the number of states')

# Measurement Noise (R)
# Users can use measurement_noise (like in prog_models) or R (like in filterpy). They're synced.
if 'R' in self.parameters:
self.parameters['measurement_noise'] = self.parameters['R']
elif 'measurement_noise' not in self.parameters:
if 'measurement_noise' in model.parameters:
self.parameters['measurement_noise'] = np.diag([model.parameters['measurement_noise'][key] for key in model.outputs])
else:
self.parameters['measurement_noise'] = np.diag([1.0e-3 for _ in range(num_measurements)])
else:
# Manage type
if isinstance(self.parameters['measurement_noise'], list):
self.parameters['measurement_noise'] = np.array(self.parameters['measurement_noise'])

# Check size
if self.parameters['measurement_noise'].shape != (num_measurements, num_measurements):
raise Exception('measurement_noise must be a square matrix with size equal to the number of outputs')

F = deepcopy(model.A)
B = deepcopy(model.B)
if np.size(B) == 0:
Expand Down
64 changes: 54 additions & 10 deletions src/prog_algs/state_estimators/unscented_kalman_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ class UnscentedKalmanFilter(state_estimator.StateEstimator):
Starting time (s)
dt : float
time step (s)
Q : List[List[float]]
Process Noise Matrix
R : List[List[float]]
Measurement Noise Matrix
process_noise : 2darray[float]
Process Noise Matrix (n_states, n_states) - default model.process_noise
measurement_noise : 2darray[float]
Measurement Noise Matrix (n_measurements x n_measurements) - default model.measurement_noise
"""
default_parameters = {
'alpha': 1,
Expand Down Expand Up @@ -56,20 +56,64 @@ def measure(x):
z = measurement_eqn(x)
return array(list(z.values())).ravel()

if 'Q' not in self.parameters:
self.parameters['Q'] = diag([1.0e-3 for i in x0.keys()])
num_states = model.n_states

# Process Noise (Q)
# Users can use process_noise (like in prog_models) or Q (like in filterpy). They're synced.
if 'Q' in self.parameters:
self.parameters['process_noise'] = self.parameters['Q']
elif 'process_noise' not in self.parameters:
# Not provided
if 'process_noise' in model.parameters:
# If model has process noise, use it
self.parameters['process_noise'] = diag([model.parameters['process_noise'][key] for key in x0.keys()])
else:
self.parameters['process_noise'] = diag([1.0e-3 for _ in range(num_states)])
else:
# If process noise is provided

# Manage type
if isinstance(self.parameters['process_noise'], list):
self.parameters['process_noise'] = array(self.parameters['process_noise'])

# Check dims
if self.parameters['process_noise'].shape != (num_states, num_states):
raise Exception('process_noise must be a square matrix with size equal to the number of states')

# Measurement Noise (R)
# Users can use measurement_noise (like in prog_models) or R (like in filterpy). They're synced.
if 'R' in self.parameters:
self.parameters['measurement_noise'] = self.parameters['R']
elif 'measurement_noise' not in self.parameters:
# Not provided
if 'measurement_noise' in model.parameters and measurement_eqn is None:
# Pull from model noise (doesn't work when measurement equation is provided)
self.parameters['measurement_noise'] = diag([model.parameters['measurement_noise'][key] for key in model.outputs])
else:
# Default to 1.0e-3 standard deviation on every output
model.parameters['measurement_noise'] = 0
num_measurements = len(measure(x0.values()))
self.parameters['measurement_noise'] = diag([1.0e-3 for _ in range(num_measurements)])
else:
# Manage type
if isinstance(self.parameters['measurement_noise'], list):
self.parameters['measurement_noise'] = array(self.parameters['measurement_noise'])

# Check dims
num_measurements = len(measure(x0.values()))
if self.parameters['measurement_noise'].shape != (num_measurements, num_measurements):
raise Exception('measurement_noise must be a square matrix with size equal to the number of outputs')

num_measurements = model.n_outputs

def state_transition(x, dt):
x = {key: value for (key, value) in zip(x0.keys(), x)}
Q_err = model.parameters['process_noise'].copy()
model.parameters['process_noise'] = dict.fromkeys(Q_err, 0)
z = model.output(x)
model.parameters['process_noise'] = Q_err
x = model.next_state(x, self.__input, dt)
model.parameters['process_noise'] = Q_err
return array(list(x.values())).ravel()

num_states = len(x0.keys())
num_measurements = model.n_outputs
points = kalman.MerweScaledSigmaPoints(num_states, alpha=self.parameters['alpha'], beta=self.parameters['beta'], kappa=self.parameters['kappa'])
self.filter = kalman.UnscentedKalmanFilter(num_states, num_measurements, self.parameters['dt'], measure, state_transition, points)

Expand Down
80 changes: 80 additions & 0 deletions tests/test_state_estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,47 @@ def __test_state_est(self, filt, m):

def test_UKF(self):
from prog_models.models import ThrownObject
self.__test_state_est(UnscentedKalmanFilter, ThrownObject)

# Check default
m = ThrownObject()
kf = UnscentedKalmanFilter(m, m.initialize())
self.assertTrue((kf.filter.Q == np.diag([m.parameters['process_noise'][key] for key in m.states])).all())
self.assertTrue((kf.filter.R == np.diag([m.parameters['measurement_noise'][key] for key in m.outputs])).all())

# Without model parameters
del m.parameters['process_noise']
del m.parameters['measurement_noise']
kf = UnscentedKalmanFilter(m, m.initialize())
self.assertTrue((kf.filter.Q == np.diag([0.001 for _ in m.states])).all())
self.assertTrue((kf.filter.R == np.diag([0.001 for _ in m.outputs])).all())

# Specifying Explicitly
Q = np.array([[0.1, 1e-3], [1e-3, 0.1]])
R = np.array([[0.2]])
kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R)
self.assertTrue((kf.filter.Q == Q).all())
self.assertTrue((kf.filter.R == R).all())

# Using Lists
Q = [[0.1, 1e-3], [1e-3, 0.1]]
R = [[0.2]]
kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R)
self.assertTrue((kf.filter.Q == Q).all())
self.assertTrue((kf.filter.R == R).all())

# Wrong size - Q
Q = [[0.1, 1e-3, 1e-4], [1e-3, 0.1, 1e-4], [1e-4, 1e-3, 0.1]]
R = [[0.2]]
with self.assertRaises(Exception):
kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R)

# Wrong size - R
Q = [[0.1, 1e-3], [1e-3, 0.1]]
R = [[0.2, 0.3]]
with self.assertRaises(Exception):
kf = UnscentedKalmanFilter(m, m.initialize(), Q = Q, R = R)

from prog_algs.state_estimators import UnscentedKalmanFilter

m = ThrownObject(process_noise=5e-2, measurement_noise=5e-2)
Expand Down Expand Up @@ -416,6 +457,45 @@ def event_state(self, x):
# Missing states
KalmanFilter(ThrownObject, {})

# Check default
m = ThrownObject()
kf = KalmanFilter(m, m.initialize())
self.assertTrue((kf.filter.Q == np.diag([m.parameters['process_noise'][key] for key in m.states])).all())
self.assertTrue((kf.filter.R == np.diag([m.parameters['measurement_noise'][key] for key in m.outputs])).all())

# Without model parameters
del m.parameters['process_noise']
del m.parameters['measurement_noise']
kf = KalmanFilter(m, m.initialize())
self.assertTrue((kf.filter.Q == np.diag([0.001 for _ in m.states])).all())
self.assertTrue((kf.filter.R == np.diag([0.001 for _ in m.outputs])).all())

# Specifying Explicitly
Q = np.array([[0.1, 1e-3], [1e-3, 0.1]])
R = np.array([[0.2]])
kf = KalmanFilter(m, m.initialize(), Q = Q, R = R)
self.assertTrue((kf.filter.Q == Q).all())
self.assertTrue((kf.filter.R == R).all())

# Using Lists
Q = [[0.1, 1e-3], [1e-3, 0.1]]
R = [[0.2]]
kf = KalmanFilter(m, m.initialize(), Q = Q, R = R)
self.assertTrue((kf.filter.Q == Q).all())
self.assertTrue((kf.filter.R == R).all())

# Wrong size - Q
Q = [[0.1, 1e-3, 1e-4], [1e-3, 0.1, 1e-4], [1e-4, 1e-3, 0.1]]
R = [[0.2]]
with self.assertRaises(Exception):
kf = KalmanFilter(m, m.initialize(), Q = Q, R = R)

# Wrong size - R
Q = [[0.1, 1e-3], [1e-3, 0.1]]
R = [[0.2, 0.3]]
with self.assertRaises(Exception):
kf = KalmanFilter(m, m.initialize(), Q = Q, R = R)

# This allows the module to be executed directly
def run_tests():
# This ensures that the directory containing StateEstimatorTemplate is in the python search directory
Expand Down