How can I create a singleton object that is processed in parallel threads and processes?

17 ビュー (過去 30 日間)
What I wanted to achieve
I would like to create a singleton-class (I called it Logger) to record logs to a file and output them into the command window.
What exactly did I want to achieve:
1. Reduce the number of input parameters for functions. Therefore, the Logger must be initialized once during the project startup process.:
Logger.setLogger("someLogFile.txt");
Further, from any place in the code (from any function, nested function, class, etc.), the text of the message should be written to the file as follows:
Logger.write("Some log message");
2. The Logger class should allow logs to be written to a file both from the main process and in the case of a call inside parallel threads or processes.
3. The class should prevent competition between processes/threads when writing to a file.
What I tried
During the implementation process, I found out that parallelization in MATLAB does not support saving persistent variables when passing a class object to processes/threads.
Also, MATLAB parallelization does not support singleton-objects implemented using the construct:
classdef SingletonClass < handle
properties (Constant)
instance = SingletonClass();
end
...
Since the save-load process, implemented when transferring an object to parallel threads/processes, does not save the values of Constant properties. As a result, the values of the passed class are loaded first, and then it is reset to empty as a result of Constant-property instance initialization.
I also tried writing a handle to the logging parallel.pool.DataQuery to a global variable, but I didn't like this approach.
I also used to create an environment variable for a log file via setenv and write data there simply from a function, but this does not solve the issue of competing processes.
What was done
As a result, I was able to create this code:
classdef Logger < handle
properties (Constant)
instance = Logger();
end
properties (SetAccess = private)
filepath = strings(0);
dataQueueListener
initialized = false;
end
methods (Static)
function setLogger(filepath, dataQueryListener)
arguments
filepath
dataQueryListener
end
loggerObj = Logger.instance;
loggerObj.filepath = filepath;
loggerObj.dataQueueListener = dataQueryListener;
if ~(parallel.internal.pool.isPoolThreadWorker || ~isempty(getCurrentJob))
afterEach(loggerObj.dataQueueListener, @(input) loggerObj.writeLogs(input{:}));
end
loggerObj.initialized = true;
end
function write(logMessage)
arguments
logMessage
end
loggerObj = Logger.instance;
if loggerObj.initialized
send(loggerObj.dataQueueListener, {logMessage, loggerObj.filepath});
else
error("Logger:write:UninitializedObjectCall", ...
"Logger не инициализирован\n");
end
end
function writeLogs(logMessage, filepath)
arguments
logMessage
filepath
end
logMessage = sprintf(logMessage);
disp(char(logMessage));
fid = fopen(filepath, 'a');
fprintf(fid, logMessage);
fclose(fid);
end
end
methods (Access = private)
function obj = Logger()
end
end
end
Then it's called like this (`anotherFileFunction` - is a function in another file):
% Variables
dQL = parallel.pool.DataQueue;
defPath = "examplelogs.txt";
% Initialize Logger
Logger.setLogger(defPath, dQL);
Logger.write("Logger initialized\n");
% Example call Logger in another file-function
anotherFileFunction();
% Example of logging in different types of parallelization
parpool('Threads');
parallelLogging(defPath, dQL);
delete(gcp('nocreate'));
parpool('Processes');
parallelLogging(defPath, dQL);
function parallelLogging(defPath, dQL)
% This is a function with it's own workspace and parallel processeing in it
Logger.write("I'm in example of parallel Logging\n")
parfor idx = 1:10
pauseSec = rand().*2;
anotherFunction(pauseSec)
Logger.write(sprintf("Thread/Process %d has been paused for %.3f seconds", idx, pauseSec));
end
end
function anotherFunction(pauseSec)
% This is a function which called in parfor-loop
pause(pauseSec)
Logger.write("Pausing\n")
end
function anotherFileFunction()
% This is a function in different file
Logger.write("I'm writing it from another file\n")
end
And it works. Logging is successful. The output to the console is also happening. But this does not fully satisfy my first requirement: I will still be forced to pass the path to the log file and a handle to the data pool in the parallelization function.
Questionы
How can I create a singleton-object that is processed in parallel threads and processes?
Is there a way to do better than I did?
How to meet all the requirements that I described above?

採用された回答

Edric Ellis
Edric Ellis 2025 年 3 月 3 日
I think your solution is heading in the right direction. Using a DataQueue is exactly what I'd suggest for getting the log messages from the workers to the client, and having the client do the writing is I think the right approach.
Here's how I would approach this. Untested code, but hopefully you get the idea. Basically you need to call
Logger.setupOnWorkers(gcp())
each time you create a pool. That's the main wrinkle. I don't think there's a simple way around it because the workers need some way of working out where to send the messages.
classdef Logger
properties (Constant)
Instance = Logger();
end
properties
Filepath string = strings(0)
ReceiveQueue parallel.pool.DataQueue = parallel.pool.DataQueue.empty()
Listener event.listener
SendQueue parallel.pool.DataQueue = parallel.pool.DataQueue.empty()
end
methods (Access = private)
function Logger()
end
end
methods
function writeMessage(obj, msg)
% Actually write the data to our file.
writelines(msg, obj.Filepath, WriteMode="append");
end
end
methods (Access = Static)
function setup(filename)
% Call this on the client to set up with a new filename.
obj = Logger.Instance;
obj.Filepath = filename;
obj.ReceiveQueue = parallel.pool.DataQueue();
obj.Listener = afterEach(obj.ReceiveQueue, @(msg) obj.writeMessage(msg));
end
function setupWithDataQueue(dataQueue)
% This is really only needed internally to set up workers.
obj = Logger.Instance;
obj.SendQueue = dataQueue;
end
function setupOnWorkers(pool)
% Call this on the client when a new pool has been created.
% This sends the appropriate DataQueue to the workers, so that
% they can use it.
obj = Logger.Instance;
assert(~isempty(obj.ReceiveQueue));
future = parfevalOnAll(pool, @Logger.setupWithDataQueue, 0, obj.ReceiveQueue);
fetchOutputs(future); % check for errors by calling fetchOutputs
end
function write(msg)
% This method either sends the message to the client, or calls
% the writeMessage method directly.
obj = Logger.Instance;
if ~isempty(obj.SendQueue)
% We have a SendQueue, use that
send(obj.SendQueue, msg);
else
% We have a Filepath, write directly.
assert(~isempty(obj.Filepath), "Expected to have a Filepath");
obj.writeMessage(msg);
end
end
end
end
Unfortunately I can't think of a way around the problem of having to run some code each time you set up a new pool.
  2 件のコメント
ZAKOVYIKA
ZAKOVYIKA 2025 年 3 月 3 日
Thanks a lot!
The idea of passing a predefined function handle to workers is great!
I've tried operating both receive and send data queues, but it didn't work.
So, I implemented it in an ugly way, using only one DataQueue object (is that an object, right?), and also passing a file name.
Final implementation (part of it) looks as follows:
classdef Logger < handle
properties (Constant)
instance = Logger();
end
properties
filepath = strings(0);
dataQueue = parallel.pool.DataQueue.empty();
initialized = false;
end
properties
verbose = true;
end
methods (Access = private)
function obj = Logger()
end
end
methods (Static)
function setLogger(filepath, options)
arguments
filepath
options.dataQueue = Logger.instance.dataQueue;
end
loggerObj = Logger.instance;
loggerObj.filepath = filepath;
if isempty(loggerObj.dataQueue) & isempty(options.dataQueue)
error("Logger:setLogger:NoDataQueue", ...
"При первичной инициализации объекта Logger нужно указывать пул данных.\n")
else
loggerObj.dataQueue = options.dataQueue;
end
if ~loggerObj.initialized && ~(parallel.internal.pool.isPoolThreadWorker || ~isempty(getCurrentJob))
afterEach(loggerObj.dataQueue, @(input) loggerObj.writeLogs(input{:}));
end
loggerObj.initialized = true;
end
...
function setupOnWorkers(pool)
loggerObj = Logger.instance;
filepath = loggerObj.filepath;
dataQueue = loggerObj.dataQueue;
if loggerObj.initialized
future = parfevalOnAll(pool, @() Logger.setLogger(filepath, dataQueue = dataQueue), 0);
fetchOutputs(future); % check for errors by calling fetchOutputs
else
error("Logger:setupOnWorkers:UninitializedObjectCall", ...
"Logger не инициализирован\n");
end
end
...
end
Edric Ellis
Edric Ellis 2025 年 3 月 4 日
The idea is that the "send" and "receive" DataQueue variables actually refer to the same underlying queue object. When you create a new DataQueue using the constructor, that's the "receive" end. When you pass that to a worker (as I did using parfevalOnAll), the object that the workers get is the "send" end. The main reason that I had differently named variables is to make it easier for the Logger.write method to work out whether it should be writing directly to the file, or whether it should be sending the message down the DataQueue. I.e. my approach aims to avoid having to check explicitly whether or not you're on a worker.

サインインしてコメントする。

その他の回答 (1 件)

Walter Roberson
Walter Roberson 2025 年 3 月 1 日
How can I create a singleton-object that is processed in parallel threads and processes?
That is not possible with process threads. Process threads run in seperate executables, with their own address space, so it is not possible to create a singleton object that is unified between them.

カテゴリ

Help Center および File ExchangeParallel for-Loops (parfor) についてさらに検索

製品


リリース

R2023b

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by