# Copyright 2018 The Texar Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Various reward related functions.
"""
import numpy as np
import tensorflow as tf
from texar.tf.utils.shapes import mask_sequences
# pylint: disable=invalid-name, too-many-arguments, no-member
__all__ = [
"discount_reward",
"_discount_reward_py_1d",
"_discount_reward_tensor_1d",
"_discount_reward_py_2d",
"_discount_reward_tensor_2d"
]
[docs]def discount_reward(reward,
sequence_length=None,
discount=1.,
normalize=False,
dtype=None,
tensor_rank=1):
"""Computes discounted reward.
:attr:`reward` and :attr:`sequence_length` can be either Tensors or python
arrays. If both are python array (or `None`), the return will be a python
array as well. Otherwise tf Tensors are returned.
Args:
reward: A Tensor or python array. Can be 1D with shape `[batch_size]`,
or 2D with shape `[batch_size, max_time]`.
sequence_length (optional): A Tensor or python array of shape
`[batch_size]`. Time steps beyond the respective sequence lengths
will be masked. Required if :attr:`reward` is 1D.
discount (float): A scalar. The discount factor.
normalize (bool): Whether to normalize the discounted reward, by
`(discounted_reward - mean) / std`. Here `mean` and `std` are
over all time steps and all samples in the batch.
dtype (dtype): Type of :attr:`reward`. If `None`, infer from
`reward` automatically.
tensor_rank (int): The number of dimensions of :attr:`reward`.
Default is 1, i.e., :attr:`reward` is a 1D Tensor consisting
of a batch dimension. Ignored if :attr:`reward`
and :attr:`sequence_length` are python arrays (or `None`).
Returns:
A 2D Tensor or python array of the discounted reward.
If :attr:`reward` and :attr:`sequence_length` are python
arrays (or `None`), the returned value is a python array as well.
Example:
.. code-block:: python
r = [2., 1.]
seq_length = [3, 2]
discounted_r = discount_reward(r, seq_length, discount=0.1)
# discounted_r == [[2. * 0.1^2, 2. * 0.1, 2.],
# [1. * 0.1, 1., 0.]]
r = [[3., 4., 5.], [6., 7., 0.]]
seq_length = [3, 2]
discounted_r = discount_reward(r, seq_length, discount=0.1)
# discounted_r == [[3. + 4.*0.1 + 5.*0.1^2, 4. + 5.*0.1, 5.],
# [6. + 7.*0.1, 7., 0.]]
"""
is_tensor = tf.contrib.framework.is_tensor
if is_tensor(reward) or is_tensor(sequence_length):
if tensor_rank == 1:
disc_reward = _discount_reward_tensor_1d(
reward, sequence_length, discount, dtype)
elif tensor_rank == 2:
disc_reward = _discount_reward_tensor_2d(
reward, sequence_length, discount, dtype)
else:
raise ValueError("`tensor_rank` can only be 1 or 2.")
if normalize:
mu, var = tf.nn.moments(disc_reward, axes=[0, 1], keep_dims=True)
disc_reward = (disc_reward - mu) / (tf.sqrt(var) + 1e-8)
else:
reward = np.array(reward)
tensor_rank = reward.ndim
if tensor_rank == 1:
disc_reward = _discount_reward_py_1d(
reward, sequence_length, discount, dtype)
elif tensor_rank == 2:
disc_reward = _discount_reward_py_2d(
reward, sequence_length, discount, dtype)
else:
raise ValueError("`reward` can only be 1D or 2D.")
if normalize:
mu = np.mean(disc_reward)
std = np.std(disc_reward)
disc_reward = (disc_reward - mu) / (std + 1e-8)
return disc_reward
def _discount_reward_py_1d(reward, sequence_length, discount=1., dtype=None):
if sequence_length is None:
raise ValueError('sequence_length must not be `None` for 1D reward.')
reward = np.array(reward)
sequence_length = np.array(sequence_length)
batch_size = reward.shape[0]
max_seq_length = np.max(sequence_length)
dtype = dtype or reward.dtype
if discount == 1.:
dmat = np.ones([batch_size, max_seq_length], dtype=dtype)
else:
steps = np.tile(np.arange(max_seq_length), [batch_size, 1])
mask = np.asarray(steps < (sequence_length - 1)[:, None], dtype=dtype)
# Make each row = [discount, ..., discount, 1, ..., 1]
dmat = mask * discount + (1 - mask)
dmat = np.cumprod(dmat[:, ::-1], axis=1)[:, ::-1]
disc_reward = dmat * reward[:, None]
disc_reward = mask_sequences(disc_reward, sequence_length, dtype=dtype)
# mask = np.asarray(steps < sequence_length[:, None], dtype=dtype)
# disc_reward = mask * disc_reward
return disc_reward
def _discount_reward_tensor_1d(reward, sequence_length,
discount=1., dtype=None):
if sequence_length is None:
raise ValueError('sequence_length must not be `None` for 1D reward.')
batch_size = tf.shape(reward)[0]
max_seq_length = tf.reduce_max(sequence_length)
dtype = dtype or reward.dtype
if discount == 1.:
dmat = tf.ones(
tf.concat([[batch_size], [max_seq_length]], 0), dtype=dtype)
else:
mask = tf.sequence_mask(sequence_length, dtype=dtype)
mask = tf.concat([mask[:, 1:], tf.zeros_like(mask[:, -1:])], axis=1)
# Make each row = [discount, ..., discount, 1, ..., 1]
dmat = mask * discount + (1 - mask)
dmat = tf.cumprod(dmat, axis=1, reverse=True)
disc_reward = dmat * tf.expand_dims(reward, -1)
disc_reward = mask_sequences(
disc_reward, sequence_length, dtype=dtype, tensor_rank=2)
return disc_reward
def _discount_reward_py_2d(reward, sequence_length=None,
discount=1., dtype=None):
if sequence_length is not None:
reward = mask_sequences(reward, sequence_length, dtype=dtype)
dtype = dtype or reward.dtype
if discount == 1.:
disc_reward = np.cumsum(
reward[:, ::-1], axis=1, dtype=dtype)[:, ::-1]
else:
disc_reward = np.copy(reward)
for i in range(reward.shape[1] - 2, -1, -1):
disc_reward[:, i] += disc_reward[:, i + 1] * discount
return disc_reward
def _discount_reward_tensor_2d(reward, sequence_length=None,
discount=1., dtype=None):
if sequence_length is not None:
reward = mask_sequences(
reward, sequence_length, dtype=dtype, tensor_rank=2)
if discount == 1.:
disc_reward = tf.cumsum(reward, axis=1, reverse=True)
else:
# [max_time, batch_size]
rev_reward_T = tf.transpose(tf.reverse(reward, [1]), [1, 0])
rev_reward_T_cum = tf.scan(
fn=lambda acc, cur: cur + discount * acc,
elems=rev_reward_T,
initializer=tf.zeros_like(reward[:, 1]),
back_prop=False)
disc_reward = tf.reverse(
tf.transpose(rev_reward_T_cum, [1, 0]), [1])
return disc_reward