import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from dataclasses import dataclass
from collections.abc import Callable
import abc
import warnings
from vbll.utils.distributions import Normal, DenseNormal, get_parameterization
def KL(p, q_scale):
feat_dim = p.mean.shape[-1]
mse_term = (p.mean ** 2).sum(-1).sum(-1) / q_scale
trace_term = (p.trace_covariance / q_scale).sum(-1)
logdet_term = (feat_dim * np.log(q_scale) - p.logdet_covariance).sum(-1)
return 0.5*(mse_term + trace_term + logdet_term) # currently exclude constant
@dataclass
class VBLLReturn():
predictive: Normal | DenseNormal # Could return distribution or mean/cov
train_loss_fn: Callable[[torch.Tensor], torch.Tensor]
val_loss_fn: Callable[[torch.Tensor], torch.Tensor]
ood_scores: None | Callable[[torch.Tensor], torch.Tensor] = None
[docs]
class DiscClassification(nn.Module):
def __init__(self,
in_features,
out_features,
regularization_weight,
parameterization='dense',
softmax_bound='jensen',
return_ood=False,
prior_scale=1.,
wishart_scale=1.,
dof=1.):
"""Initailize a DiscClassification model.
:param in_features: Number of input features
:param out_features: Number of output features
:param regularization_weight: Weight of regularization term
:param parameterization: Parameterization of the last layer distribution.
:param softmax_bound: Bound to use for softmax.
:param return_ood: Whether to return OOD scores.
:param prior_scale: Scale of the prior.
:param wishart_scale: Scale of the Wishart distribution.
:param dof: Degrees of freedom of the Wishart distribution.
"""
super(DiscClassification, self).__init__()
self.wishart_scale = wishart_scale
self.dof = (dof + out_features + 1.)/2.
self.regularization_weight = regularization_weight
# define prior, currently fixing zero mean and arbitrarily scaled cov
self.prior_scale = prior_scale
# noise distribution
self.noise_mean = nn.Parameter(torch.zeros(out_features), requires_grad = False)
self.noise_logdiag = nn.Parameter(torch.randn(out_features) - 1)
# last layer distribution
self.W_dist = get_parameterization(parameterization)
self.W_mean = nn.Parameter(torch.randn(out_features, in_features))
self.W_logdiag = nn.Parameter(torch.randn(out_features, in_features) - 0.5 * np.log(in_features))
if parameterization == 'dense':
self.W_offdiag = nn.Parameter(torch.randn(out_features, in_features, in_features)/in_features)
if softmax_bound == 'jensen':
self.softmax_bound = self.jensen_bound
self.return_ood = return_ood
def noise_chol(self):
return torch.exp(self.noise_logdiag)
def W_chol(self):
out = torch.exp(self.W_logdiag)
if self.W_dist == DenseNormal:
out = torch.tril(self.W_offdiag, diagonal=-1) + torch.diag_embed(out)
return out
def W(self):
return self.W_dist(self.W_mean, self.W_chol())
def noise(self):
return Normal(self.noise_mean, self.noise_chol())
# ----- bounds
def adaptive_bound(self, x, y):
# TODO(jamesharrison)
raise NotImplementedError('Adaptive bound not currently implemented')
def jensen_bound(self, x, y):
pred = self.logit_predictive(x)
linear_term = pred.mean[torch.arange(x.shape[0]), y]
pre_lse_term = pred.mean + 0.5 * pred.covariance_diagonal
lse_term = torch.logsumexp(pre_lse_term, dim=-1)
return linear_term - lse_term
def montecarlo_bound(self, x, y, n_samples=10):
sampled_log_sm = F.log_softmax(self.logit_predictive(x).rsample(sample_shape=torch.Size([n_samples])), dim=-1)
mean_over_samples = torch.mean(sampled_log_sm, dim=0)
return mean_over_samples[torch.arange(x.shape[0]), y]
# ----- forward and core ops
def forward(self, x):
# TODO(jamesharrison): add assert on shape of x input
out = VBLLReturn(torch.distributions.Categorical(probs = self.predictive(x)),
self._get_train_loss_fn(x),
self._get_val_loss_fn(x))
if self.return_ood: out.ood_scores = self.max_predictive(x)
return out
def logit_predictive(self, x):
return (self.W() @ x[..., None]).squeeze(-1) + self.noise()
def predictive(self, x, n_samples = 10):
softmax_samples = F.softmax(self.logit_predictive(x).rsample(sample_shape=torch.Size([n_samples])), dim=-1)
return torch.clip(torch.mean(softmax_samples, dim=0),min=0.,max=1.)
def _get_train_loss_fn(self, x):
def loss_fn(y):
noise = self.noise()
kl_term = KL(self.W(), self.prior_scale)
wishart_term = (self.dof * noise.logdet_precision - 0.5 * self.wishart_scale * noise.trace_precision)
total_elbo = torch.mean(self.softmax_bound(x, y))
total_elbo += self.regularization_weight * (wishart_term - kl_term)
return -total_elbo
return loss_fn
def _get_val_loss_fn(self, x):
def loss_fn(y):
return -torch.mean(torch.log(self.predictive(x)[torch.arange(x.shape[0]), y]))
return loss_fn
# ----- OOD metrics
def max_predictive(self, x):
return torch.max(self.predictive(x), dim=-1)[0]
[docs]
class GenClassification(nn.Module):
# TODO(jamesharrison): add dirichlet
def __init__(self,
in_features,
out_features,
regularization_weight,
parameterization='dense',
softmax_bound='jensen',
return_ood=False,
prior_scale=1.,
wishart_scale=1.,
dof=1.):
"""Initailize a GenClassification model.
:param in_features: Number of input features
:param out_features: Number of output features
:param regularization_weight: Weight of regularization term
:param parameterization: Parameterization of the last layer distribution.
:param softmax_bound: Bound to use for softmax.
:param return_ood: Whether to return OOD scores.
:param prior_scale: Scale of the prior.
:param wishart_scale: Scale of the Wishart distribution.
:param dof: Degrees of freedom of the Wishart distribution.
"""
super(GenClassification, self).__init__()
self.wishart_scale = wishart_scale
self.dof = (dof + in_features + 1.)/2.
self.regularization_weight = regularization_weight
# define prior, currently fixing zero mean and arbitrarily scaled cov
self.prior_scale = prior_scale
# noise distribution
self.noise_mean = nn.Parameter(torch.zeros(in_features), requires_grad = False)
self.noise_logdiag = nn.Parameter(torch.randn(in_features))
# last layer distribution
self.mu_dist = get_parameterization(parameterization)
self.mu_mean = nn.Parameter(0.1*torch.randn(out_features, in_features))
self.mu_logdiag = nn.Parameter(torch.randn(out_features, in_features))
if parameterization == 'dense':
raise NotImplementedError('Dense embedding cov not implemented for g-vbll')
if softmax_bound == 'jensen':
self.softmax_bound = self.jensen_bound
self.return_ood = return_ood
def noise_chol(self):
return torch.exp(self.noise_logdiag)
def mu_chol(self):
out = torch.exp(self.mu_logdiag)
# TODO(jamesharrison): add impl for dense/low rank cov
return out
def mu(self):
return self.mu_dist(self.mu_mean, self.mu_chol())
def noise(self):
return Normal(self.noise_mean, self.noise_chol())
# ----- bounds
def adaptive_bound(self, x, y):
# TODO(jamesharrison)
raise NotImplementedError('Adaptive bound not implemented for g-vbll')
def jensen_bound(self, x, y):
linear_pred = self.noise() + self.mu_mean[y]
linear_term = linear_pred.log_prob(x)
if isinstance(linear_pred, Normal):
# Is there a more elegant way to handle this?
linear_term = linear_term.sum(-1)
trace_term = (self.mu().covariance_diagonal[y] / self.noise().covariance_diagonal).sum(-1)
pre_lse_term = self.logit_predictive(x)
lse_term = torch.logsumexp(pre_lse_term, dim=-1)
return linear_term - 0.5 * trace_term - lse_term
def montecarlo_bound(self, x, y, n_samples=10):
# TODO(jamesharrison)
raise NotImplementedError('Monte carlo bound not implemented for g-vbll')
# ----- forward and core ops
def forward(self, x):
# TODO(jamesharrison): add assert on shape of x input
out = VBLLReturn(torch.distributions.Categorical(probs = self.predictive(x)),
self._get_train_loss_fn(x),
self._get_val_loss_fn(x))
if self.return_ood: out.ood_scores = self.max_predictive(x)
return out
def logit_predictive(self, x):
# likelihood of x under marginalized
logprob = (self.mu() + self.noise()).log_prob(x.unsqueeze(-2))
if isinstance(self.mu(), Normal):
# Is there a more elegant way to handle this?
logprob = logprob.sum(-1)
return logprob
def predictive(self, x):
return torch.clip(F.softmax(self.logit_predictive(x), dim=-1), min=0., max=1.)
def _get_train_loss_fn(self, x):
def loss_fn(y):
# print('computing loss fn')
noise = self.noise()
kl_term = KL(self.mu(), self.prior_scale)
wishart_term = (self.dof * noise.logdet_precision - 0.5 * self.wishart_scale * noise.trace_precision)
total_elbo = torch.mean(self.softmax_bound(x, y))
total_elbo += self.regularization_weight * (wishart_term - kl_term)
return -total_elbo
return loss_fn
def _get_val_loss_fn(self, x):
def loss_fn(y):
return -torch.mean(torch.log(self.predictive(x)[np.arange(x.shape[0]), y]))
return loss_fn
# ----- OOD metrics
def max_predictive(self, x):
return torch.max(self.predictive(x), dim=-1)[0]