libqicore-api  2.8.7.4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
fileoperation.hxx
Go to the documentation of this file.
1 #pragma once
2 #ifndef _QICORE_FILEOPERATION_HPP_
3 #define _QICORE_FILEOPERATION_HPP_
4 
5 #include <iostream>
6 #include <memory>
7 #include <boost/filesystem/fstream.hpp>
8 #include <boost/filesystem/operations.hpp>
9 #include <qi/detail/warn_push_ignore_deprecated.hpp>
10 
11 namespace qi
12 {
19  {
20  public:
24  virtual ~FileOperation()
25  {
26  auto task = std::move(_task);
27  if (task)
28  {
29  task->promise.future().cancel();
30  }
31  }
32 
33  // Move only
34  FileOperation(const FileOperation&) = delete;
35  FileOperation& operator=(const FileOperation&) = delete;
36 
41  FileOperation(FileOperation&& other) // TODO VS2015 C++11: = default;
42  : _task(std::move(other._task))
43  {}
44 
49  FileOperation& operator=(FileOperation&& other) // TODO VS2015 C++11: = default;
50  {
51  _task = std::move(other._task);
52  return *this;
53  }
54 
62  qi::Future<void> start()
63  {
64  if (!_task)
65  {
66  throw std::runtime_error{ "Tried to start an invalid FileOperation" };
67  }
68 
69  if (_task->isLaunched.swap(true))
70  {
71  throw std::runtime_error{ "Called FileOperation::start() more than once!" };
72  }
73 
74  return _task->run();
75  }
76 
88  qi::Future<void> detach()
89  {
90  boost::shared_ptr<Task> sharedTask = std::move(_task);
91 
92  if (!sharedTask)
93  {
94  throw std::runtime_error("Called FileOperation::detach() but no task is owned!");
95  }
96 
97  if (!sharedTask->isLaunched._value)
98  {
99  throw std::runtime_error("Called FileOperation::detach() but task was not started!");
100  }
101 
102  auto future = sharedTask->promise.future();
103  future.connect([sharedTask](const Future<void>&){}); // keep the task alive until it ends
104  return future;
105  }
106 
108  auto operator()() -> decltype(start()) { return start(); }
109 
113  ProgressNotifierPtr notifier() const { return _task ? _task->localNotifier : ProgressNotifierPtr{}; }
114 
120  bool isValid() const { return _task ? true : false; }
121 
123  explicit operator bool() const { return isValid(); }
124 
125  protected:
126  struct Task
127  : public boost::enable_shared_from_this<Task>
128  {
129  Task(FilePtr file)
130  : sourceFile{ std::move(file) }
131  , fileSize{ sourceFile->size() }
132  , promise{ PromiseNoop<void> }
134  , remoteNotifier{ sourceFile->operationProgress() }
135  , isRemoteDeprecated(sourceFile.metaObject().findMethod("read").empty())
136  {
137  }
138 
139  virtual ~Task() = default;
140 
141  qi::Future<void> run()
142  {
143  localNotifier->reset();
144  isRemoteDeprecated ? remoteNotifier->_reset() : remoteNotifier->reset();
145  localNotifier->notifyRunning();
146  isRemoteDeprecated ? remoteNotifier->_notifyRunning() : remoteNotifier->notifyRunning();
147  start();
148  return promise.future();
149  }
150 
151  void finish()
152  {
153  promise.setValue(0);
154  localNotifier->notifyFinished();
155  isRemoteDeprecated ? remoteNotifier->_notifyFinished() : remoteNotifier->notifyFinished();
156  }
157 
158  void fail(const std::string& errorMessage)
159  {
160  promise.setError(errorMessage);
161  localNotifier->notifyFailed();
162  isRemoteDeprecated ? remoteNotifier->_notifyFailed() : remoteNotifier->notifyFailed();
163  }
164 
165  void cancel()
166  {
167  promise.setCanceled();
168  localNotifier->notifyCanceled();
169  isRemoteDeprecated ? remoteNotifier->_notifyCanceled() : remoteNotifier->notifyCanceled();
170  }
171 
172  void notifyProgressed(double newProgress)
173  {
174  localNotifier->notifyProgressed(newProgress);
175  isRemoteDeprecated ? remoteNotifier->_notifyProgressed(newProgress) : remoteNotifier->notifyProgressed(newProgress);
176  }
177 
178  virtual void start() = 0;
179 
180  qi::Atomic<bool> isLaunched{ false };
182  const std::streamsize fileSize;
183  Promise<void> promise;
186  const bool isRemoteDeprecated;
187  };
188 
189  using TaskPtr = boost::shared_ptr<Task>;
190 
191  explicit FileOperation(TaskPtr task)
192  : _task{ std::move(task) }
193  {
194  if (!_task)
195  throw std::runtime_error("FileOperation requires a non-null task on constrution.");
196  }
197 
198  private:
199  TaskPtr _task;
200  };
201 
203 
205  using FileOperationPtr = Object<FileOperation>;
206 
209  : public FileOperation
210  {
211  public:
218  FileCopyToLocal(qi::FilePtr file, qi::Path localPath)
219  : FileOperation(boost::make_shared<Task>(std::move(file), std::move(localPath)))
220  {
221  }
222 
223  private:
224  class Task
225  : public FileOperation::Task
226  {
227  public:
228  Task(FilePtr sourceFile, qi::Path localFilePath)
229  : FileOperation::Task(std::move(sourceFile))
230  , localPath(std::move(localFilePath))
231  {
232  }
233 
234  void start() override
235  {
236  if (makeLocalFile())
237  {
238  fetchData();
239  }
240  }
241 
242  void stop()
243  {
244  localFile.close();
245  finish();
246  }
247 
248  bool makeLocalFile()
249  {
250  if (localPath.isEmpty()) {
251  return true;
252  }
253 
254  localFile.open(localPath.bfsPath(), std::ios::out | std::ios::binary);
255  if (!localFile.is_open())
256  {
257  fail("Failed to create local file copy.");
258  return false;
259  }
260  return true;
261  }
262 
263  void write(Buffer buffer)
264  {
265  if (localFile.is_open())
266  localFile.write(static_cast<const char*>(buffer.data()), buffer.totalSize());
267  else
268  std::cout.write(static_cast<const char*>(buffer.data()), buffer.totalSize());
269  bytesWritten += buffer.totalSize();
270  assert(fileSize >= bytesWritten);
271 
272  const double progress = static_cast<double>(bytesWritten) / static_cast<double>(fileSize);
273  notifyProgressed(progress);
274  }
275 
276  void fetchData()
277  {
278  static const size_t ARBITRARY_BYTES_TO_READ_PER_CYCLE = 512 * 1024;
279  auto myself = shared_from_this();
280 
281  const auto readFuncName = isRemoteDeprecated ? "_read" : "read";
282 
283  sourceFile.async<Buffer>(readFuncName, bytesWritten, ARBITRARY_BYTES_TO_READ_PER_CYCLE)
284  .connect([this, myself](Future<Buffer> futureBuffer)
285  {
286  if (futureBuffer.hasError())
287  {
288  fail(futureBuffer.error());
289  clearLocalFile();
290  return;
291  }
292  if (promise.isCancelRequested())
293  {
294  clearLocalFile();
295  cancel();
296  return;
297  }
298 
299  write(futureBuffer.value());
300  if (bytesWritten < fileSize)
301  fetchData();
302  else
303  stop();
304  }
305  );
306  }
307 
308  void clearLocalFile()
309  {
310  if (localFile.is_open())
311  localFile.close();
312  boost::filesystem::remove(localPath);
313  }
314 
315  boost::filesystem::ofstream localFile;
316  std::streamsize bytesWritten = 0;
317  const qi::Path localPath;
318  };
319 
320  };
321 
329  QICORE_API FutureSync<void> copyToLocal(FilePtr file, Path localPath);
330 }
331 
332 #include <qi/detail/warn_pop_ignore_deprecated.hpp>
333 #endif
qi::Future< void > start()
Object< FileOperation > FileOperationPtr
Pointer to a file operation with sharing semantic.
FileOperation(FileOperation &&other)
virtual ~Task()=default
ProgressNotifierPtr notifier() const
qi::Object< File > FilePtr
Pointer to a file with shared/remote semantic.
Definition: file.hpp:213
FutureSync< void > copyToLocal(FilePtr file, Path localPath)
boost::shared_ptr< Task > TaskPtr
qi::Object< ProgressNotifier > ProgressNotifierPtr
Pointer to a ProgressNotifier with shared/remote semantic.
Definition: file.hpp:114
void fail(const std::string &errorMessage)
FileOperation(const FileOperation &)=delete
FileOperation & operator=(const FileOperation &)=delete
qi::Future< void > run()
virtual void start()=0
const ProgressNotifierPtr localNotifier
ProgressNotifierPtr createProgressNotifier(Future< void > operationFuture={})
const ProgressNotifierPtr remoteNotifier
void notifyProgressed(double newProgress)
FileCopyToLocal(qi::FilePtr file, qi::Path localPath)
auto operator()() -> decltype(start())
Call operator: calls start()
FileOperation(TaskPtr task)
qi::Atomic< bool > isLaunched
const std::streamsize fileSize
bool isValid() const
virtual ~FileOperation()
FileOperation & operator=(FileOperation &&other)
qi::Future< void > detach()
#define QICORE_API
Definition: api.hpp:14