#!/usr/bin/env python3
# pyre-strict
from typing import Any, Callable, Optional, Tuple, Union
import numpy as np
import torch
from captum._utils.common import _format_tensor_into_tuples
from captum._utils.typing import BaselineType, TargetType, TensorOrTupleOfTensorsGeneric
from captum.attr._core.feature_ablation import FeatureAblation
from captum.attr._utils.common import (
_format_and_verify_sliding_window_shapes,
_format_and_verify_strides,
)
from captum.log import log_usage
from torch import Tensor
[docs]
class Occlusion(FeatureAblation):
r"""
A perturbation based approach to compute attribution, involving
replacing each contiguous rectangular region with a given baseline /
reference, and computing the difference in output. For features located
in multiple regions (hyperrectangles), the corresponding output differences
are averaged to compute the attribution for that feature.
The first patch is applied with the corner aligned with all indices 0,
and strides are applied until the entire dimension range is covered. Note
that this may cause the final patch applied in a direction to be cut-off
and thus smaller than the target occlusion shape.
More details regarding the occlusion (or grey-box / sliding window)
method can be found in the original paper and in the DeepExplain
implementation.
https://arxiv.org/abs/1311.2901
https://github.com/marcoancona/DeepExplain/blob/master/deepexplain\
/tensorflow/methods.py#L401
"""
def __init__(self, forward_func: Callable[..., Tensor]) -> None:
r"""
Args:
forward_func (Callable): The forward function of the model or
any modification of it.
"""
FeatureAblation.__init__(self, forward_func)
self.use_weights = True
[docs]
@log_usage()
def attribute( # type: ignore
self,
inputs: TensorOrTupleOfTensorsGeneric,
sliding_window_shapes: Union[Tuple[int, ...], Tuple[Tuple[int, ...], ...]],
strides: Union[
None, int, Tuple[int, ...], Tuple[Union[int, Tuple[int, ...]], ...]
] = None,
baselines: BaselineType = None,
target: TargetType = None,
additional_forward_args: Optional[object] = None,
perturbations_per_eval: int = 1,
show_progress: bool = False,
) -> TensorOrTupleOfTensorsGeneric:
r"""
Args:
inputs (Tensor or tuple[Tensor, ...]): Input for which occlusion
attributions are computed. If forward_func takes a single
tensor as input, a single input tensor should be provided.
If forward_func takes multiple tensors as input, a tuple
of the input tensors should be provided. It is assumed
that for all given input tensors, dimension 0 corresponds
to the number of examples (aka batch size), and if
multiple input tensors are provided, the examples must
be aligned appropriately.
sliding_window_shapes (tuple or tuple[tuple]): Shape of patch
(hyperrectangle) to occlude each input. For a single
input tensor, this must be a tuple of length equal to the
number of dimensions of the input tensor - 1, defining
the dimensions of the patch. If the input tensor is 1-d,
this should be an empty tuple. For multiple input tensors,
this must be a tuple containing one tuple for each input
tensor defining the dimensions of the patch for that
input tensor, as described for the single tensor case.
strides (int, tuple, tuple[int], or tuple[tuple], optional):
This defines the step by which the occlusion hyperrectangle
should be shifted by in each direction for each iteration.
For a single tensor input, this can be either a single
integer, which is used as the step size in each direction,
or a tuple of integers matching the number of dimensions
in the occlusion shape, defining the step size in the
corresponding dimension. For multiple tensor inputs, this
can be either a tuple of integers, one for each input
tensor (used for all dimensions of the corresponding
tensor), or a tuple of tuples, providing the stride per
dimension for each tensor.
To ensure that all inputs are covered by at least one
sliding window, the stride for any dimension must be
<= the corresponding sliding window dimension if the
sliding window dimension is less than the input
dimension.
If None is provided, a stride of 1 is used for each
dimension of each input tensor.
Default: None
baselines (scalar, Tensor, tuple of scalar, or Tensor, optional):
Baselines define reference value which replaces each
feature when occluded.
Baselines can be provided as:
- a single tensor, if inputs is a single tensor, with
exactly the same dimensions as inputs or
broadcastable to match the dimensions of inputs
- a single scalar, if inputs is a single tensor, which will
be broadcasted for each input value in input tensor.
- a tuple of tensors or scalars, the baseline corresponding
to each tensor in the inputs' tuple can be:
- either a tensor with matching dimensions to
corresponding tensor in the inputs' tuple
or the first dimension is one and the remaining
dimensions match with the corresponding
input tensor.
- or a scalar, corresponding to a tensor in the
inputs' tuple. This scalar value is broadcasted
for corresponding input tensor.
In the cases when `baselines` is not provided, we internally
use zero scalar corresponding to each input tensor.
Default: None
target (int, tuple, Tensor, or list, optional): Output indices for
which difference is computed (for classification cases,
this is usually the target class).
If the network returns a scalar value per example,
no target index is necessary.
For general 2D outputs, targets can be either:
- a single integer or a tensor containing a single
integer, which is applied to all input examples
- a list of integers or a 1D tensor, with length matching
the number of examples in inputs (dim 0). Each integer
is applied as the target for the corresponding example.
For outputs with > 2 dimensions, targets can be either:
- A single tuple, which contains #output_dims - 1
elements. This target index is applied to all examples.
- A list of tuples with length equal to the number of
examples in inputs (dim 0), and each tuple containing
#output_dims - 1 elements. Each tuple is applied as the
target for the corresponding example.
Default: None
additional_forward_args (Any, optional): If the forward function
requires additional arguments other than the inputs for
which attributions should not be computed, this argument
can be provided. It must be either a single additional
argument of a Tensor or arbitrary (non-tuple) type or a
tuple containing multiple additional arguments including
tensors or any arbitrary python types. These arguments
are provided to forward_func in order following the
arguments in inputs.
For a tensor, the first dimension of the tensor must
correspond to the number of examples. For all other types,
the given argument is used for all forward evaluations.
Note that attributions are not computed with respect
to these arguments.
Default: None
perturbations_per_eval (int, optional): Allows multiple occlusions
to be included in one batch (one call to forward_fn).
By default, perturbations_per_eval is 1, so each occlusion
is processed individually.
Each forward pass will contain a maximum of
perturbations_per_eval * #examples samples.
For DataParallel models, each batch is split among the
available devices, so evaluations on each available
device contain at most
(perturbations_per_eval * #examples) / num_devices
samples.
Default: 1
show_progress (bool, optional): Displays the progress of computation.
It will try to use tqdm if available for advanced features
(e.g. time estimation). Otherwise, it will fallback to
a simple output of progress.
Default: False
Returns:
*Tensor* or *tuple[Tensor, ...]* of **attributions**:
- **attributions** (*Tensor* or *tuple[Tensor, ...]*):
The attributions with respect to each input feature.
Attributions will always be
the same size as the provided inputs, with each value
providing the attribution of the corresponding input index.
If a single tensor is provided as inputs, a single tensor is
returned. If a tuple is provided for inputs, a tuple of
corresponding sized tensors is returned.
Examples::
>>> # SimpleClassifier takes a single input tensor of size Nx4x4,
>>> # and returns an Nx3 tensor of class probabilities.
>>> net = SimpleClassifier()
>>> # Generating random input with size 2 x 4 x 4
>>> input = torch.randn(2, 4, 4)
>>> # Defining Occlusion interpreter
>>> ablator = Occlusion(net)
>>> # Computes occlusion attribution, ablating each 3x3 patch,
>>> # shifting in each direction by the default of 1.
>>> attr = ablator.attribute(input, target=1, sliding_window_shapes=(3,3))
"""
formatted_inputs = _format_tensor_into_tuples(inputs)
# Formatting strides
strides = _format_and_verify_strides(strides, formatted_inputs)
# Formatting sliding window shapes
sliding_window_shapes = _format_and_verify_sliding_window_shapes(
sliding_window_shapes, formatted_inputs
)
# Construct tensors from sliding window shapes
sliding_window_tensors = tuple(
torch.ones(window_shape, device=formatted_inputs[i].device)
for i, window_shape in enumerate(sliding_window_shapes)
)
# Construct counts, defining number of steps to make of occlusion block in
# each dimension.
shift_counts = []
for i, inp in enumerate(formatted_inputs):
current_shape = np.subtract(inp.shape[1:], sliding_window_shapes[i])
# Verify sliding window doesn't exceed input dimensions.
assert (np.array(current_shape) >= 0).all(), (
"Sliding window dimensions {} cannot exceed input dimensions" "{}."
).format(sliding_window_shapes[i], tuple(inp.shape[1:]))
# Stride cannot be larger than sliding window for any dimension where
# the sliding window doesn't cover the entire input.
assert np.logical_or(
np.array(current_shape) == 0,
np.array(strides[i]) <= sliding_window_shapes[i],
).all(), (
"Stride dimension {} cannot be larger than sliding window "
"shape dimension {}."
).format(
strides[i], sliding_window_shapes[i]
)
shift_counts.append(
tuple(
np.add(np.ceil(np.divide(current_shape, strides[i])).astype(int), 1)
)
)
# Use ablation attribute method
return super().attribute.__wrapped__(
self,
inputs,
baselines=baselines,
target=target,
additional_forward_args=additional_forward_args,
perturbations_per_eval=perturbations_per_eval,
sliding_window_tensors=sliding_window_tensors,
shift_counts=tuple(shift_counts),
strides=strides,
show_progress=show_progress,
)
# pyre-fixme[24] Generic type `Callable` expects 2 type parameters.
[docs]
def attribute_future(self) -> Callable:
r"""
This method is not implemented for Occlusion.
"""
raise NotImplementedError("attribute_future is not implemented for Occlusion")
def _construct_ablated_input(
self,
expanded_input: Tensor,
input_mask: Union[None, Tensor, Tuple[Tensor, ...]],
baseline: Union[None, float, Tensor],
start_feature: int,
end_feature: int,
**kwargs: Any,
) -> Tuple[Tensor, Tensor]:
r"""
Ablates given expanded_input tensor with given feature mask, feature range,
and baselines, and any additional arguments.
expanded_input shape is (num_features, num_examples, ...)
with remaining dimensions corresponding to remaining original tensor
dimensions and num_features = end_feature - start_feature.
input_mask is None for occlusion, and the mask is constructed
using sliding_window_tensors, strides, and shift counts, which are provided in
kwargs. baseline is expected to
be broadcastable to match expanded_input.
This method returns the ablated input tensor, which has the same
dimensionality as expanded_input as well as the corresponding mask with
either the same dimensionality as expanded_input or second dimension
being 1. This mask contains 1s in locations which have been ablated (and
thus counted towards ablations for that feature) and 0s otherwise.
"""
input_mask = torch.stack(
[
self._occlusion_mask(
expanded_input,
j,
kwargs["sliding_window_tensors"],
kwargs["strides"],
kwargs["shift_counts"],
)
for j in range(start_feature, end_feature)
],
dim=0,
).long()
assert baseline is not None, "baseline should not be None"
ablated_tensor = (
expanded_input
* (
torch.ones(1, dtype=torch.long, device=expanded_input.device)
- input_mask
).to(expanded_input.dtype)
# pyre-fixme[58]: `*` is not supported for operand types `Union[None, float,
# Tensor]` and `Tensor`.
) + (baseline * input_mask.to(expanded_input.dtype))
return ablated_tensor, input_mask
def _occlusion_mask(
self,
expanded_input: Tensor,
ablated_feature_num: int,
sliding_window_tsr: Tensor,
strides: Union[int, Tuple[int, ...]],
shift_counts: Tuple[int, ...],
) -> Tensor:
"""
This constructs the current occlusion mask, which is the appropriate
shift of the sliding window tensor based on the ablated feature number.
The feature number ranges between 0 and the product of the shift counts
(# of times the sliding window should be shifted in each dimension).
First, the ablated feature number is converted to the number of steps in
each dimension from the origin, based on shift counts. This procedure
is similar to a base conversion, with the position values equal to shift
counts. The feature number is first taken modulo shift_counts[0] to
get the number of shifts in the first dimension (each shift
by shift_count[0]), and then divided by shift_count[0].
The procedure is then continued for each element of shift_count. This
computes the total shift in each direction for the sliding window.
We then need to compute the padding required after the window in each
dimension, which is equal to the total input dimension minus the sliding
window dimension minus the (left) shift amount. We construct the
array pad_values which contains the left and right pad values for each
dimension, in reverse order of dimensions, starting from the last one.
Once these padding values are computed, we pad the sliding window tensor
of 1s with 0s appropriately, which is the corresponding mask,
and the result will match the input shape.
"""
remaining_total = ablated_feature_num
current_index = []
for i, shift_count in enumerate(shift_counts):
stride = strides[i] if isinstance(strides, tuple) else strides
current_index.append((remaining_total % shift_count) * stride)
remaining_total = remaining_total // shift_count
remaining_padding = np.subtract(
expanded_input.shape[2:], np.add(current_index, sliding_window_tsr.shape)
)
pad_values = [
val for pair in zip(remaining_padding, current_index) for val in pair
]
pad_values.reverse()
padded_tensor = torch.nn.functional.pad(
sliding_window_tsr, tuple(pad_values) # type: ignore
)
return padded_tensor.reshape((1,) + tuple(padded_tensor.shape))
def _get_feature_range_and_mask(
self, input: Tensor, input_mask: Optional[Tensor], **kwargs: Any
) -> Tuple[int, int, Union[None, Tensor, Tuple[Tensor, ...]]]:
feature_max = int(np.prod(kwargs["shift_counts"]))
return 0, feature_max, None
def _get_feature_counts(
self,
inputs: TensorOrTupleOfTensorsGeneric,
feature_mask: Tuple[Tensor, ...],
**kwargs: Any,
) -> Tuple[int, ...]:
"""return the numbers of possible input features"""
return tuple(np.prod(counts).astype(int) for counts in kwargs["shift_counts"])