Passing output from afterEach to variable

7 ビュー (過去 30 日間)
Matt Tejer
Matt Tejer 2024 年 12 月 29 日
編集済み: Thomas Falch 2025 年 5 月 15 日 10:23
Hello,
I`m trying to use parfeval that is running in the background to get images from multiple cameras and process them.
I tried using parallel.pool.DataQueue and parallel.pool.PollableDataQueue to get data from parfeval, but data being sent is too big, slowing down FPS and quickly filling up available ram.
My questions are:
  1. Is there a way to fetch data on request instead of creating queue (I need only newest data - not sending data from worker after each iteration to queue)? The data can be inside worker untill it is needed.
  2. Can queue be limited to size 1?
  3. How to use afterEach to save data to variable? I can only find examples with disp and replacing data on figures, not saving to workspace.
  4. Alternativly, is there a way to use timers in different threads instead of parfeval?
Thank you for your time,
Matt
  10 件のコメント
Walter Roberson
Walter Roberson 2024 年 12 月 31 日

The documentation for timer has no section for extended capabilities, so there is no support for timers in thread pools.

I am not sure at the moment if there is support for timers in process pools. If there is, you would still have the same problem about returning the data to another process.

Walter Roberson
Walter Roberson 2024 年 12 月 31 日

There is a faint chance that calling the external routines is compatible with thread pools, but I think it is unlikely.

サインインしてコメントする。

採用された回答

Walter Roberson
Walter Roberson 2024 年 12 月 29 日
This method is fairly clumsy, but in theory it would work.
Use a parpool size of 1.
Use parfeval() to create a parallel.pool.PollableDataQueue object and return it to the caller.
Create a parallel.pool.DataQueue object in the workspace.
parfeval() passing in the DataQueue object
afterEach() on the DataQueue
Now, when the client is ready to accept data, send() a signal (any value) from the client to the PollableDataQueue
Meanwhile, the worker is looping around fetching frames, and checking the QueueLength property of the PollableDataQueue. When the queue becomes non-empty, poll() the PollableDataQueue to "eat" the signal; then have the worker write the most recent frame to the DataQueue object. If the queue is empty, just loop back to accepting the next frame.
The afterEach function on the DataQueue receives the frame and stores it (and does whatever with it.) Oh, and the afterEach probably does the next send() to the PollableDataQueue to signal that the client is ready for more data.

その他の回答 (1 件)

Thomas Falch
Thomas Falch 2025 年 5 月 15 日 8:56
編集済み: Thomas Falch 2025 年 5 月 15 日 10:23
Starting in R2025a you can use the new "any-destination" parallel.pool.PollableDataQueue to more easily send data from the client to workers (or from workers to workers). It is documented here: PollableDataQueue - Send and poll data between client and workers - MATLAB
You can either use it as a replacement for the queue created on the worker the second step in Walter's answer. An alternative solution could (particularily if you want to use more workers) could look like this:
% Runs on worker, creates data on client's request and sends it back
function createDataOnDemand(toClientQueue, toWorkerQueue)
while true
% Wait for next request from client, or exit if client closes queue
[dataParams, didReceive] = toWorkerQueue.poll(Inf);
if ~didReceive
break
end
% Generate next piece of data, and send back to client
data = createData(dataParams);
toClientQueue.send(data);
end
end
pool = parpool()
toClientQueue = parallel.pool.PollableDataQueue(Destination="any");
toWorkerQueue = parallel.pool.PollableDataQueue(Destination="any");
f = parfeval(@createDataOnDemand, toClientQueue, toWorkerQueue)
% Drive work from the client, send requests for data, then receive and
% process it
for i = 1:n
dataParams = getDataParams(n);
toWorkerQueue.send(dataParams);
data = toClientQueue.poll(Inf);
processData(data)
end
toWorkerQueue.close();

カテゴリ

Help Center および File ExchangeParallel Computing Fundamentals についてさらに検索

製品


リリース

R2021a

Translated by