"""Handles shared memory usage to accelerate memory accesses for CUDA
"""
# Standard libraries
import os
from math import floor
# Local imports
from .. import utils
from . import CUDAParams
[docs]class variable(object):
"""
Class that represents an array/index pair. Used for in the internal
dicitonary of the `shared_memory_manager` for identification and
tracking of `variable` usage for eviction.
"""
def __init__(self, base, index, lang='cuda'):
"""
Creates a `variable` with given base and index
Parameters
----------
base : str
The name of the array
index : int
The index in the array
"""
self.base = base
self.index = index
self.last_use_count = 0
self.lang = lang
def __eq__(self, other):
"""Tests `variable` equality"""
if self.index is None:
return self.base == other.base
return self.base == other.base and self.index == other.index
[docs] def reset(self):
"""Reset the usage count of this `variable`
"""
self.last_use_count = 0
[docs] def update(self):
"""Increment the usage count of this `variable`
"""
self.last_use_count += 1
[docs] def to_string(self):
"""Converts this `variable` to a string representation
"""
if self.index is None:
return self.base
return utils.get_array(self.lang, self.base, self.index)
[docs]class shared_memory_manager(object):
"""Manager for GPU shared memory.
"""
def __init__(self, blocks_per_sm=8, num_threads=64, L1_PREFERRED=True):
"""Creates a shared memory manager
Parameters
----------
blocks_per_sm : int, optional
The number of blocks / streaming multiprocessor to target
num_threads : int, optional
The number of threads / block expected in kernel launches
L1_PREFERRED : bool, optional
Whether or not to prefer a larger L1 cache over more shared memory
(recommended).
Notes
-----
For ease, a single SMM is used in the entire program.
Thus this class has methods for setting the state/behaviour
(e.g., `reset`, and `set_on_eviction`).
"""
SHARED_MEMORY_SIZE = CUDAParams.get_shared_size(L1_PREFERRED)
self.blocks_per_sm = blocks_per_sm
self.num_threads = num_threads
self.skeleton = 'shared_temp[{}]'
self.shared_dict = {}
self.shared_per_block = int(floor(SHARED_MEMORY_SIZE / self.blocks_per_sm))
self.shared_per_thread = int(floor(self.shared_per_block / self.num_threads))
self.shared_indexes = [True for i in range(self.shared_per_thread)]
self.eviction_marking = [False for i in range(self.shared_per_thread)]
self.on_eviction = None
self.self_eviction_strategy = lambda x: x.last_use_count >= 2
[docs] def force_eviction(self):
"""Forces eviction of the manager's internal dictionary.
Notes
-----
The internal dictionary will be reset, and (if supplied) the
`on_eviction` function will be called on each evicted entry.
"""
key_copy = [x for x in self.shared_dict.keys()]
for shared_index in key_copy:
self.evict(shared_index)
[docs] def evict_longest_gap(self):
"""Evicts entry in the internal dictionary the longest without use.
"""
if len(self.shared_dict):
ind = max((x for x in self.shared_dict if self.eviction_marking[x]),
key=lambda k: self.shared_dict[k].last_use_count
)
self.evict(ind)
[docs] def evict(self, shared_index):
"""Removes the entry at shared_index from the internal dictionary
Parameters
----------
shared_index : int
The key to remove from the internal dictionary
Notes
-----
If set, `on_eviction` will be called.
"""
var = self.shared_dict[shared_index]
del self.shared_dict[shared_index]
self.shared_indexes.append(shared_index)
self.eviction_marking[shared_index] = False
if self.on_eviction is not None:
self.on_eviction(var, self.__get_string(shared_index), shared_index)
[docs] def add_to_dictionary(self, val):
"""Adds the value to the next available dictionary location
Parameters
----------
val : `variable`
The value to add to the dictionary
"""
assert len(self.shared_indexes)
self.shared_dict[self.shared_indexes.pop()] = val
[docs] def set_on_eviction(self, func):
"""Sets a callback function that is called upon eviction of a variable from the
internal dictionary
Parameters
----------
func : `function`
Function that takes one arguement (the evicted variable)
Returns
-------
None
"""
self.on_eviction = func
[docs] def reset(self):
"""Resets the SMM for use by other methods/callers
Returns
-------
None
"""
self.shared_dict = {}
self.shared_indexes = list(range(self.shared_per_thread))
self.eviction_marking = [False for x in range(self.shared_per_thread)]
self.on_eviction = None
[docs] def write_init(self, file, indent=4):
"""Convenience method to define shared memory for CUDA
Parameters
----------
file : `File`
Open `File` object to write to
indent : int, optional
The number of spaces to use in the indent
Returns
-------
None
"""
file.write(''.join([' ' for i in range(indent)]) +
'extern volatile __shared__ double ' +
self.skeleton.format('') + utils.line_end['cuda']
)
[docs] def load_into_shared(self, file, variables, estimated_usage=None,
indent=2, load=True
):
"""The main SMM method, loads/evicts variables based upon estimated
usage and stagnancy.
Parameters
----------
file : `File`
Open `File` object to write to
variables : list of `variable`
List of variables to consider loading
estimated_usage : list of float, optional
If specified, these will be used to prioritize variable additon
indent : int, optional
The number of spaces to use in the indentation
load : bool, optional
If ``True`` (default), a load into the internal dictionary will be
written to the file. If ``False``, this will must be handled by
the calling routine.
Returns
-------
List of `bool` to indicate if variables are loaded in shared memory.
"""
#save old variables
old_index = []
old_variables = []
if len(self.shared_dict):
old_index, old_variables = zip(*self.shared_dict.items())
#update all the old variables usage counts
for x in old_variables:
x.update()
#check for self_eviction
if self.self_eviction_strategy is not None:
for ind, val in self.shared_dict.items():
#if qualifies for self eviction and not in current set
if self.self_eviction_strategy(val) and not val in variables:
self.eviction_marking[ind] = True
elif val in variables:
self.eviction_marking[ind] = False
#sort by usage if available
if estimated_usage is not None:
variables = [(x[1], estimated_usage[x[0]]) for x in
sorted(enumerate(variables),
key=lambda x: estimated_usage[x[0]], reverse=True)
]
#now update for new variables
for thevar in variables:
if estimated_usage is not None:
var, usage = thevar
else:
var = thevar
usage = None
#don't re-add if it's already in
if not var in self.shared_dict.values():
#skip barely used ones
if usage <= 1:
continue
#if we have something marked for eviction, now's the time
if (len(self.shared_dict) >= self.shared_per_thread and
self.eviction_marking.count(True)
):
self.evict_longest_gap()
#add it if possible
if len(self.shared_dict) < self.shared_per_thread:
self.add_to_dictionary(var)
if estimated_usage:
# add any usage = 1 ones if space
for var, usage in variables:
if not var in self.shared_dict.values():
if len(self.shared_dict) < self.shared_per_thread:
self.add_to_dictionary(var)
if load is True:
# need to write loads for any new vars
for ind, val in self.shared_dict.items():
if not val in old_variables:
file.write(' ' * indent + self.__get_string(ind) +
' = ' + val.to_string() +
utils.line_end['cuda']
)
return {k:(v not in old_variables)
for k, v in self.shared_dict.items()
}
[docs] def mark_for_eviction(self, variables):
"""Marks variables for possible eviction upon next load_into_shared call
Parameters
----------
variables : list of `variable`
List of variables to consider for eviction
"""
self.eviction_marking = [var in variables for var in
self.shared_dict.values()
]
def __get_string(self, index):
"""Convenience method to get correct GPU shared memory addressing
Parameters
----------
index : int
Index of GPU block.
Returns
-------
str
String with shared memory addressing.
"""
if index == 0:
return self.skeleton.format('threadIdx.x')
else:
return self.skeleton.format('threadIdx.x + '
'{} * blockDim.x'.format(index)
)
[docs] def get_index(self, var):
"""Checks to see if a variable is in the internal dictionary.
If so returns internal index and variable
Parameters
----------
var : `variable`
The variable to check
Returns
-------
our_ind : int
Index of variable in internal dictionary
our_var : `variable`
Variable found in internal dictionary
"""
our_ind, our_var = next((val for val in self.shared_dict.items()
if val[1] == var), (None, None)
)
return our_ind, our_var
[docs] def get_array(self, lang, thevar, index, twod=None):
"""A substitute for `utils.get_array`.
If the variable is in our internal dictionary returns shared memory
address, otherwise calls `utils.get_array`.
Parameters
----------
lang : {'c', 'cuda'}
Programming language.
thevar : `variable`
Variable of interest.
index : int
Index in the array.
twod : int
Not used in this function.
Returns
-------
name : str
String with indexed array.
"""
var = variable(thevar, index, lang)
our_ind, our_var = self.get_index(var)
if our_var is not None:
#mark as used
our_var.reset()
#and return the shared string
name = self.__get_string(our_ind)
else:
name = var.to_string()
return name