TensorDeviceType.h
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_TENSOR_TENSOR_DEVICE_TYPE_H
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_TYPE_H
12 
13 
14 namespace Eigen {
15 
16 // Default device for the machine (typically a single cpu core)
17 struct DefaultDevice {
18  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
19  return internal::aligned_malloc(num_bytes);
20  }
21  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
22  internal::aligned_free(buffer);
23  }
24  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
25  ::memcpy(dst, src, n);
26  }
27  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
28  memcpy(dst, src, n);
29  }
30  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
31  memcpy(dst, src, n);
32  }
33  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
34  ::memset(buffer, c, n);
35  }
36 
37  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE size_t numThreads() const {
38 #ifndef __CUDA_ARCH__
39  // Running on the host CPU
40  return 1;
41 #else
42  // Running on a CUDA device
43  return 32;
44 #endif
45  }
46 
47  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
48 #ifndef __CUDA_ARCH__
49  // Running single threaded on the host CPU
50  // Should return an enum that encodes the ISA supported by the CPU
51  return 1;
52 #else
53  // Running on a CUDA device
54  return __CUDA_ARCH__ / 100;
55 #endif
56  }
57 };
58 
59 
60 // Multiple cpu cores
61 // We should really use a thread pool here but first we need to find a portable thread pool library.
62 #ifdef EIGEN_USE_THREADS
63 
64 // This defines an interface that ThreadPoolDevice can take to use
65 // custom thread pools underneath.
66 class ThreadPoolInterface {
67  public:
68  virtual void Schedule(std::function<void()> fn) = 0;
69 
70  virtual ~ThreadPoolInterface() {}
71 };
72 
73 // The implementation of the ThreadPool type ensures that the Schedule method
74 // runs the functions it is provided in FIFO order when the scheduling is done
75 // by a single thread.
76 class ThreadPool : public ThreadPoolInterface {
77  public:
78  // Construct a pool that contains "num_threads" threads.
79  explicit ThreadPool(int num_threads) {
80  for (int i = 0; i < num_threads; i++) {
81  threads_.push_back(new std::thread([this]() { WorkerLoop(); }));
82  }
83  }
84 
85  // Wait until all scheduled work has finished and then destroy the
86  // set of threads.
87  ~ThreadPool()
88  {
89  {
90  // Wait for all work to get done.
91  std::unique_lock<std::mutex> l(mu_);
92  empty_.wait(l, [this]() { return pending_.empty(); });
93  exiting_ = true;
94 
95  // Wakeup all waiters.
96  for (auto w : waiters_) {
97  w->ready = true;
98  w->work = nullptr;
99  w->cv.notify_one();
100  }
101  }
102 
103  // Wait for threads to finish.
104  for (auto t : threads_) {
105  t->join();
106  delete t;
107  }
108  }
109 
110  // Schedule fn() for execution in the pool of threads. The functions are
111  // executed in the order in which they are scheduled.
112  void Schedule(std::function<void()> fn) {
113  std::unique_lock<std::mutex> l(mu_);
114  if (waiters_.empty()) {
115  pending_.push_back(fn);
116  } else {
117  Waiter* w = waiters_.back();
118  waiters_.pop_back();
119  w->ready = true;
120  w->work = fn;
121  w->cv.notify_one();
122  }
123  }
124 
125  protected:
126  void WorkerLoop() {
127  std::unique_lock<std::mutex> l(mu_);
128  Waiter w;
129  while (!exiting_) {
130  std::function<void()> fn;
131  if (pending_.empty()) {
132  // Wait for work to be assigned to me
133  w.ready = false;
134  waiters_.push_back(&w);
135  w.cv.wait(l, [&w]() { return w.ready; });
136  fn = w.work;
137  w.work = nullptr;
138  } else {
139  // Pick up pending work
140  fn = pending_.front();
141  pending_.pop_front();
142  if (pending_.empty()) {
143  empty_.notify_all();
144  }
145  }
146  if (fn) {
147  mu_.unlock();
148  fn();
149  mu_.lock();
150  }
151  }
152  }
153 
154  private:
155  struct Waiter {
156  std::condition_variable cv;
157  std::function<void()> work;
158  bool ready;
159  };
160 
161  std::mutex mu_;
162  std::vector<std::thread*> threads_; // All threads
163  std::vector<Waiter*> waiters_; // Stack of waiting threads.
164  std::deque<std::function<void()>> pending_; // Queue of pending work
165  std::condition_variable empty_; // Signaled on pending_.empty()
166  bool exiting_ = false;
167 };
168 
169 
170 // Notification is an object that allows a user to to wait for another
171 // thread to signal a notification that an event has occurred.
172 //
173 // Multiple threads can wait on the same Notification object.
174 // but only one caller must call Notify() on the object.
175 class Notification {
176  public:
177  Notification() : notified_(false) {}
178  ~Notification() {}
179 
180  void Notify() {
181  std::unique_lock<std::mutex> l(mu_);
182  eigen_assert(!notified_);
183  notified_ = true;
184  cv_.notify_all();
185  }
186 
187  void WaitForNotification() {
188  std::unique_lock<std::mutex> l(mu_);
189  cv_.wait(l, [this]() { return notified_; } );
190  }
191 
192  private:
193  std::mutex mu_;
194  std::condition_variable cv_;
195  bool notified_;
196 };
197 
198 // Runs an arbitrary function and then calls Notify() on the passed in
199 // Notification.
200 template <typename Function, typename... Args> struct FunctionWrapper
201 {
202  static void run(Notification* n, Function f, Args... args) {
203  f(args...);
204  n->Notify();
205  }
206 };
207 
208 static EIGEN_STRONG_INLINE void wait_until_ready(Notification* n) {
209  if (n) {
210  n->WaitForNotification();
211  }
212 }
213 
214 
215 // Build a thread pool device on top the an existing pool of threads.
216 struct ThreadPoolDevice {
217  // The ownership of the thread pool remains with the caller.
218  ThreadPoolDevice(ThreadPoolInterface* pool, size_t num_cores) : pool_(pool), num_threads_(num_cores) { }
219 
220  EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
221  return internal::aligned_malloc(num_bytes);
222  }
223 
224  EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
225  internal::aligned_free(buffer);
226  }
227 
228  EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
229  ::memcpy(dst, src, n);
230  }
231  EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
232  memcpy(dst, src, n);
233  }
234  EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
235  memcpy(dst, src, n);
236  }
237 
238  EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
239  ::memset(buffer, c, n);
240  }
241 
242  EIGEN_STRONG_INLINE size_t numThreads() const {
243  return num_threads_;
244  }
245 
246  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
247  // Should return an enum that encodes the ISA supported by the CPU
248  return 1;
249  }
250 
251  template <class Function, class... Args>
252  EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
253  Notification* n = new Notification();
254  std::function<void()> func =
255  std::bind(&FunctionWrapper<Function, Args...>::run, n, f, args...);
256  pool_->Schedule(func);
257  return n;
258  }
259  template <class Function, class... Args>
260  EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
261  std::function<void()> func = std::bind(f, args...);
262  pool_->Schedule(func);
263  }
264 
265  private:
266  ThreadPoolInterface* pool_;
267  size_t num_threads_;
268 };
269 
270 #endif
271 
272 
273 // GPU offloading
274 #ifdef EIGEN_USE_GPU
275 
276 // This defines an interface that GPUDevice can take to use
277 // CUDA streams underneath.
278 class StreamInterface {
279  public:
280  virtual ~StreamInterface() {}
281 
282  virtual const cudaStream_t& stream() const = 0;
283  virtual const cudaDeviceProp& deviceProperties() const = 0;
284 
285  // Allocate memory on the actual device where the computation will run
286  virtual void* allocate(size_t num_bytes) const = 0;
287  virtual void deallocate(void* buffer) const = 0;
288 };
289 
290 static cudaDeviceProp* m_deviceProperties;
291 static bool m_devicePropInitialized = false;
292 
293 static void initializeDeviceProp() {
294  if (!m_devicePropInitialized) {
295  if (!m_devicePropInitialized) {
296  int num_devices;
297  cudaError_t status = cudaGetDeviceCount(&num_devices);
298  assert(status == cudaSuccess);
299  m_deviceProperties = new cudaDeviceProp[num_devices];
300  for (int i = 0; i < num_devices; ++i) {
301  status = cudaGetDeviceProperties(&m_deviceProperties[i], i);
302  assert(status == cudaSuccess);
303  }
304  m_devicePropInitialized = true;
305  }
306  }
307 }
308 
309 static const cudaStream_t default_stream = cudaStreamDefault;
310 
311 class CudaStreamDevice : public StreamInterface {
312  public:
313  // Use the default stream on the current device
314  CudaStreamDevice() : stream_(&default_stream) {
315  cudaGetDevice(&device_);
316  initializeDeviceProp();
317  }
318  // Use the default stream on the specified device
319  CudaStreamDevice(int device) : stream_(&default_stream), device_(device) {
320  initializeDeviceProp();
321  }
322  // Use the specified stream. Note that it's the
323  // caller responsibility to ensure that the stream can run on
324  // the specified device. If no device is specified the code
325  // assumes that the stream is associated to the current gpu device.
326  CudaStreamDevice(const cudaStream_t* stream, int device = -1)
327  : stream_(stream), device_(device) {
328  if (device < 0) {
329  cudaGetDevice(&device_);
330  } else {
331  int num_devices;
332  cudaError_t err = cudaGetDeviceCount(&num_devices);
333  assert(err == cudaSuccess);
334  assert(device < num_devices);
335  device_ = device;
336  }
337  initializeDeviceProp();
338  }
339 
340  const cudaStream_t& stream() const { return *stream_; }
341  const cudaDeviceProp& deviceProperties() const {
342  return m_deviceProperties[device_];
343  }
344  virtual void* allocate(size_t num_bytes) const {
345  cudaError_t err = cudaSetDevice(device_);
346  assert(err == cudaSuccess);
347  void* result;
348  err = cudaMalloc(&result, num_bytes);
349  assert(err == cudaSuccess);
350  assert(result != NULL);
351  return result;
352  }
353  virtual void deallocate(void* buffer) const {
354  cudaError_t err = cudaSetDevice(device_);
355  assert(err == cudaSuccess);
356  assert(buffer != NULL);
357  err = cudaFree(buffer);
358  assert(err == cudaSuccess);
359  }
360 
361  private:
362  const cudaStream_t* stream_;
363  int device_;
364 };
365 
366 
367 struct GpuDevice {
368  // The StreamInterface is not owned: the caller is
369  // responsible for its initialization and eventual destruction.
370  explicit GpuDevice(const StreamInterface* stream) : stream_(stream) {
371  eigen_assert(stream);
372  }
373 
374  // TODO(bsteiner): This is an internal API, we should not expose it.
375  EIGEN_STRONG_INLINE const cudaStream_t& stream() const {
376  return stream_->stream();
377  }
378 
379  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
380 #ifndef __CUDA_ARCH__
381  return stream_->allocate(num_bytes);
382 #else
383  eigen_assert(false && "The default device should be used instead to generate kernel code");
384  return NULL;
385 #endif
386  }
387 
388  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
389 #ifndef __CUDA_ARCH__
390  stream_->deallocate(buffer);
391 
392 #else
393  eigen_assert(false && "The default device should be used instead to generate kernel code");
394 #endif
395  }
396 
397  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
398 #ifndef __CUDA_ARCH__
399  cudaError_t err = cudaMemcpyAsync(dst, src, n, cudaMemcpyDeviceToDevice,
400  stream_->stream());
401  assert(err == cudaSuccess);
402 #else
403  eigen_assert(false && "The default device should be used instead to generate kernel code");
404 #endif
405  }
406 
407  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
408 #ifndef __CUDA_ARCH__
409  cudaError_t err =
410  cudaMemcpyAsync(dst, src, n, cudaMemcpyHostToDevice, stream_->stream());
411  assert(err == cudaSuccess);
412 #else
413  eigen_assert(false && "The default device should be used instead to generate kernel code");
414 #endif
415  }
416 
417  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
418 #ifndef __CUDA_ARCH__
419  cudaError_t err =
420  cudaMemcpyAsync(dst, src, n, cudaMemcpyDeviceToHost, stream_->stream());
421  assert(err == cudaSuccess);
422 #else
423  eigen_assert(false && "The default device should be used instead to generate kernel code");
424 #endif
425  }
426 
427  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
428 #ifndef __CUDA_ARCH__
429  cudaError_t err = cudaMemsetAsync(buffer, c, n, stream_->stream());
430  assert(err == cudaSuccess);
431 #else
432  eigen_assert(false && "The default device should be used instead to generate kernel code");
433 #endif
434  }
435 
436  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE size_t numThreads() const {
437  // FIXME
438  return 32;
439  }
440 
441  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
442  // FIXME
443  return 48*1024;
444  }
445 
446  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
447  // We won't try to take advantage of the l2 cache for the time being, and
448  // there is no l3 cache on cuda devices.
449  return firstLevelCacheSize();
450  }
451 
452  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void synchronize() const {
453 #ifndef __CUDA_ARCH__
454  cudaError_t err = cudaStreamSynchronize(stream_->stream());
455  assert(err == cudaSuccess);
456 #else
457  assert(false && "The default device should be used instead to generate kernel code");
458 #endif
459  }
460 
461  inline int getNumCudaMultiProcessors() const {
462  return stream_->deviceProperties().multiProcessorCount;
463  }
464  inline int maxCudaThreadsPerBlock() const {
465  return stream_->deviceProperties().maxThreadsPerBlock;
466  }
467  inline int maxCudaThreadsPerMultiProcessor() const {
468  return stream_->deviceProperties().maxThreadsPerMultiProcessor;
469  }
470  inline int sharedMemPerBlock() const {
471  return stream_->deviceProperties().sharedMemPerBlock;
472  }
473  inline int majorDeviceVersion() const {
474  return stream_->deviceProperties().major;
475  }
476 
477  // This function checks if the CUDA runtime recorded an error for the
478  // underlying stream device.
479  inline bool ok() const {
480  cudaError_t error = cudaStreamQuery(stream_->stream());
481  return (error == cudaSuccess) || (error == cudaErrorNotReady);
482  }
483 
484  private:
485  const StreamInterface* stream_;
486 
487 };
488 
489 
490 #define LAUNCH_CUDA_KERNEL(kernel, gridsize, blocksize, sharedmem, device, ...) \
491  (kernel) <<< (gridsize), (blocksize), (sharedmem), (device).stream() >>> (__VA_ARGS__); \
492  assert(cudaGetLastError() == cudaSuccess);
493 
494 
495 // FIXME: Should be device and kernel specific.
496 static inline void setCudaSharedMemConfig(cudaSharedMemConfig config) {
497  cudaError_t status = cudaDeviceSetSharedMemConfig(config);
498  assert(status == cudaSuccess);
499 }
500 
501 #endif
502 
503 } // end namespace Eigen
504 
505 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_TYPE_H
Namespace containing all symbols from the Eigen library.
Definition: CXX11Meta.h:13