Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 149 additions & 69 deletions brian2/monitors/ratemonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,135 @@
Module defining `PopulationRateMonitor`.
"""

from abc import ABC, abstractmethod

import numpy as np

from brian2.core.clocks import Clock
from brian2.core.variables import Variables
from brian2.groups.group import CodeRunner, Group
from brian2.units.allunits import hertz, second
from brian2.units.fundamentalunits import Quantity, check_units
from brian2.utils.logger import get_logger

__all__ = ["PopulationRateMonitor"]
__all__ = ["PopulationRateMonitor", "RateMoniter"]


logger = get_logger(__name__)


class PopulationRateMonitor(Group, CodeRunner):
class RateMoniter(CodeRunner, Group, Clock, ABC):
"""
Abstract base class for monitors that record rates.
"""

@abstractmethod
@check_units(bin_size=second)
def binned(self, bin_size):
"""
Return the rate calculated in bins of a certain size.

Parameters
-------------
bin_size : `Quantity`
The size of the bins in seconds. Should be a multiple of dt.

Returns
-------
bins : `Quantity`
The midpoints of the bins.
binned_values : `Quantity`
The binned values. For EventMonitor subclasses, this is a 2D array
with shape (neurons, bins). For PopulationRateMonitor, this is a 1D array.
"""
raise NotImplementedError()

@check_units(width=second)
def smooth_rates(self, window="gaussian", width=None):
"""
Returns a smooted out version of the firing rate(s).

Parameters
----------
window : str, ndarray
The window to use for smoothing. Can be a string to chose a
predefined window(`flat` for a rectangular, and `gaussian`
for a Gaussian-shaped window).

In this case the width of the window
is determined by the `width` argument. Note that for the Gaussian
window, the `width` parameter specifies the standard deviation of
the Gaussian, the width of the actual window is `4*width + dt`
(rounded to the nearest dt). For the flat window, the width is
rounded to the nearest odd multiple of dt to avoid shifting the rate
in time.
Alternatively, an arbitrary window can be given as a numpy array
(with an odd number of elements). In this case, the width in units
of time depends on the ``dt`` of the simulation, and no `width`
argument can be specified. The given window will be automatically
normalized to a sum of 1.
width : `Quantity`, optional
The width of the ``window`` in seconds (for a predefined window).

Returns
-------
rate : `Quantity`
The smoothed firing rate(s) in Hz. For EventMonitor subclasses,
this returns a 2D array with shape (neurons, time_bins).
For PopulationRateMonitor, this returns a 1D array.
Note that the rates are smoothed and not re-binned, i.e. the length
of the returned array is the same as the length of the binned data
and can be plotted against the bin centers from the ``binned`` method.
"""
if width is None and isinstance(window, str):
raise TypeError("Need a width when using a predefined window.")
if width is not None and not isinstance(window, str):
raise TypeError("Can only specify a width for a predefined window")

if isinstance(window, str):
if window == "gaussian":
# basically Gaussian theoretically spans infinite time, but practically it falls off quickly,
# So we choose a window of +- 2*(Standard deviations), i.e 95% of gaussian curve
width_dt = int(
np.round(2 * width / self.clock.dt)
) # Rounding only for the size of the window, not for the standard
# deviation of the Gaussian
window = np.exp(
-np.arange(-width_dt, width_dt + 1) ** 2
* 1.0 # hack to ensure floating-point division :)
/ (2 * (width / self.clock.dt) ** 2)
)
elif window == "flat":
width_dt = int(np.round(width / (2 * self.clock.dt))) * 2 + 1
used_width = width_dt * self.clock.dt
if abs(used_width - width) > 1e-6 * self.clock.dt:
logger.info(
f"width adjusted from {width} to {used_width}",
"adjusted_width",
once=True,
)
window = np.ones(width_dt)
else:
raise NotImplementedError(f'Unknown pre-defined window "{window}"')
else:
try:
window = np.asarray(window)
except TypeError:
raise TypeError(f"Cannot use a window of type {type(window)}")
if window.ndim != 1:
raise TypeError("The provided window has to be one-dimensional.")
if len(window) % 2 != 1:
raise TypeError("The window has to have an odd number of values.")

# Get the binned rates at the finest resolution
_, binned_values = self.binned(bin_size=self.clock.dt)

# The actual smoothing
smoothed = np.convolve(binned_values, window * 1.0 / sum(window), mode="same")
return Quantity(smoothed, dim=hertz.dim)


class PopulationRateMonitor(RateMoniter):
"""
Record instantaneous firing rates, averaged across neurons from a
`NeuronGroup` or other spike source.
Expand Down Expand Up @@ -100,82 +214,48 @@ def reinit(self):
"""
raise NotImplementedError()

@check_units(width=second)
def smooth_rate(self, window="gaussian", width=None):
@check_units(bin_size=second)
def binned(self, bin_size):
"""
smooth_rate(self, window='gaussian', width=None)

Return a smooth version of the population rate.
Return the population rate binned with the given bin size.

Parameters
----------
window : str, ndarray
The window to use for smoothing. Can be a string to chose a
predefined window(``'flat'`` for a rectangular, and ``'gaussian'``
for a Gaussian-shaped window). In this case the width of the window
is determined by the ``width`` argument. Note that for the Gaussian
window, the ``width`` parameter specifies the standard deviation of
the Gaussian, the width of the actual window is ``4*width + dt``
(rounded to the nearest dt). For the flat window, the width is
rounded to the nearest odd multiple of dt to avoid shifting the rate
in time.
Alternatively, an arbitrary window can be given as a numpy array
(with an odd number of elements). In this case, the width in units
of time depends on the ``dt`` of the simulation, and no ``width``
argument can be specified. The given window will be automatically
normalized to a sum of 1.
width : `Quantity`, optional
The width of the ``window`` in seconds (for a predefined window).
bin_size : `Quantity`
The size of the bins in seconds. Should be a multiple of dt.

Returns
-------
rate : `Quantity`
The population rate in Hz, smoothed with the given window. Note that
the rates are smoothed and not re-binned, i.e. the length of the
returned array is the same as the length of the ``rate`` attribute
and can be plotted against the `PopulationRateMonitor` 's ``t``
attribute.
bins : `Quantity`
The midpoints of the bins.
binned_values : `Quantity`
The binned population rates as a 1D array in Hz.
"""
if width is None and isinstance(window, str):
raise TypeError("Need a width when using a predefined window.")
if width is not None and not isinstance(window, str):
raise TypeError("Can only specify a width for a predefined window")
if (
bin_size / self.clock.dt
) % 1 > 1e-6: # to make sure bin_size is an integer multiple of the internal time resolution dt
raise ValueError("bin_size has to be a multiple of dt.")
if bin_size == self.clock.dt:
return self.t[:], self.rate

if isinstance(window, str):
if window == "gaussian":
width_dt = int(np.round(2 * width / self.clock.dt))
# Rounding only for the size of the window, not for the standard
# deviation of the Gaussian
window = np.exp(
-np.arange(-width_dt, width_dt + 1) ** 2
* 1.0
/ (2 * (width / self.clock.dt) ** 2)
)
elif window == "flat":
width_dt = int(width / 2 / self.clock.dt) * 2 + 1
used_width = width_dt * self.clock.dt
if abs(used_width - width) > 1e-6 * self.clock.dt:
logger.info(
f"width adjusted from {width} to {used_width}",
"adjusted_width",
once=True,
)
window = np.ones(width_dt)
else:
raise NotImplementedError(f'Unknown pre-defined window "{window}"')
else:
try:
window = np.asarray(window)
except TypeError:
raise TypeError(f"Cannot use a window of type {type(window)}")
if window.ndim != 1:
raise TypeError("The provided window has to be one-dimensional.")
if len(window) % 2 != 1:
raise TypeError("The window has to have an odd number of values.")
return Quantity(
np.convolve(self.rate_, window * 1.0 / sum(window), mode="same"),
dim=hertz.dim,
)
num_bins = int(self.clock.t / bin_size)
bins = (
np.arange(num_bins) * bin_size + bin_size / 2.0
) # as we want Bin centers (not edges)

t_indices = (self.t / self.clock.dt).astype(int)
bin_indices = (t_indices * self.clock.dt / bin_size).astype(int)

binned_values = np.zeros(num_bins) # to store total firing rate values per bin
bin_counts = np.zeros(num_bins) # to store how many samples went into each bin

np.add.at(binned_values, bin_indices, self.rate)
np.add.at(bin_counts, bin_indices, 1)

# Avoid division by zero for empty bins
non_empty_bins = bin_counts > 0
binned_values[non_empty_bins] /= bin_counts[non_empty_bins]
return bins, Quantity(binned_values, dim=hertz.dim)

def __repr__(self):
classname = self.__class__.__name__
Expand Down
Loading