メインコンテンツ

parallel.pool.PollableDataQueue

クライアントとワーカーの間でのデータの送信とポーリング

    説明

    PollableDataQueue オブジェクトにより、計算中にクライアントと並列プール内のワーカー間でデータまたはメッセージの同期送信およびポーリングができます。たとえば、ワーカーから、中間値をクライアントまたは別のワーカーに送信して、その値を別の計算に使用できます。次のことも可能です。

    • クライアントまたは任意のワーカーからデータを送信する。

    • キューの作成元のクライアントまたはワーカーにのみデータの受信を許可する PollableDataQueue オブジェクトを作成する。

    • クライアントまたはプール内の任意のワーカーにデータの受信を許可する PollableDataQueue オブジェクトを作成する。 (R2025a 以降)

    その他すべてのハンドル オブジェクトとは異なり、PollableDataQueue オブジェクトと DataQueue オブジェクトは転送されても接続されたままになります。

    作成

    説明

    q = parallel.pool.PollableDataQueue は、クライアントとワーカー間でのデータの送信とポーリングに使用できる PollableDataQueue オブジェクトを作成します。結果の PollableDataQueue オブジェクトは、その作成元のクライアントまたはワーカーによってのみポーリングできます。データを受信するワーカーまたはクライアント上で、PollableDataQueue を作成します。

    q = parallel.pool.PollableDataQueue(Destination=destination) は、PollableDataQueue オブジェクトの送信先動作を設定します。 (R2025a 以降)

    クライアントまたは任意のワーカーで PollableDataQueue オブジェクトをポーリングしてデータを受信できるようにする場合、Destination="any" を設定します。

    入力引数

    すべて展開する

    R2025a 以降

    キューの送信先動作。次の値のいずれかとして指定します。

    • "creator" - キューの作成元のクライアントまたはワーカーにのみ、キューのポーリングとデータの受信を許可します。キューに送信されたデータは、キューの作成元のクライアントまたはワーカーに即座に送信されます。

    • "any" - クライアントまたは並列プール内の任意のワーカーに、キューのポーリングとデータの受信を許可します。データはキュー内で待機し、キューをポーリングしたクライアントまたはワーカーに送信され、そのクライアントまたはワーカーが特定のデータの送信先になります。

    プロパティ

    すべて展開する

    R2025a 以降

    このプロパティは、close オブジェクト関数を使用してキューを閉じた後、読み取り専用になります。

    キューのクローズ状態。次のいずれかの値として指定します。

    • false - キューは閉じておらず、キューにデータを送信できます。

    • true - キューは閉じていて、キューにデータを送信できません。キューにデータを送信しようとするとエラーになります。キューに対するデータのポーリングは続行できます。閉じているキューを再度開くことはできません。

    データ型: logical

    この プロパティ は読み取り専用です。

    ワーカーまたはクライアントがポーリングによって受信できる可能性のある、現在キューに保持されているデータ アイテム数。ゼロまたは正の整数として指定します。

    名前と値の引数 Destination を使用して設定されるキューの送信先動作によって、QueueLength プロパティの値が次のように決まります。

    • Destination 引数を設定せずに PollableDataQueue オブジェクトを作成する場合、または Destination"creator" に設定する場合、QueueLength は、PollableDataQueue オブジェクトを作成するワーカーまたはクライアント上で 0 または正の整数です。

      • クライアントが PollableDataQueue オブジェクトを作成する場合、値はすべてのワーカー上で 0 です。

      • いずれかのワーカーが PollableDataQueue を作成する場合、値はクライアント上およびその他すべてのワーカー上で 0 です。

    • Destination"any" に設定する場合、値はクライアント上およびすべてのワーカー上で 0 または正の整数です。

    R2025a より前: PollableDataQueue オブジェクトを作成するワーカーまたはクライアント上で、QueueLength プロパティの値は 0 または正の整数です。クライアントが PollableDataQueue オブジェクトを作成する場合、値はすべてのワーカー上で 0 です。いずれかのワーカーが PollableDataQueue を作成する場合、値はクライアント上およびその他すべてのワーカー上で 0 です。

    オブジェクト関数

    closeClose pollable data queue
    poll ポーリング可能なデータ キューに送信されたデータを取得
    sendデータ キューを使用したクライアントとワーカーの間でのデータの送信

    すべて折りたたむ

    PollableDataQueue オブジェクトを作成します。

    p = parallel.pool.PollableDataQueue;
    

    parfor ループを実行し、値 1 をもつデータなどのメッセージを送信します。

    parfor idx = 1
        send(p,idx); 
    end
    

    結果をポーリングします。

    poll(p)
    1
    

    PollableDataQueue オブジェクトによるデータのポーリングの詳細については、poll を参照してください。

    R2025a 以降

    クライアントから並列プール内の複数のワーカーにメッセージを送信するには、Destination"any" に設定した PollableDataQueue オブジェクトを使用します。

    4 つのスレッド ワーカーからなるプールを起動します。

    numWorkers = 4;
    pool = parpool("Threads",numWorkers);
    Starting parallel pool (parpool) using the 'Threads' profile ...
    Connected to parallel pool with 4 workers.
    

    2 つの PollableDataQueue オブジェクト、すなわちワーカーにメッセージを送信するための workerPdq という名前のキュー (作成時の Destination の設定: "any") と、ワーカーからのメッセージを受信するための clientPdq という名前のキューを作成します。

    workerPdq = parallel.pool.PollableDataQueue(Destination="any");
    clientPdq = parallel.pool.PollableDataQueue;

    parfevalOnAll を使用して、すべてのワーカーで analyzeMessage 補助関数を実行します。workerPdq キューと clientPdq キューを引数として関数に渡します。

    parfevalOnAll(@analyzeMessage,0,workerPdq,clientPdq);

    workerPdq キューを介して、各ワーカーに個別のメッセージを送信します。

    for idx = 1:numWorkers
        send(workerPdq,compose("Hello, Worker %d!",idx));
    end

    clientPdq キューをポーリングして、ワーカーからのメッセージを受信します。各メッセージを無期限に待機するには inf を使用します。

    for idx = 1:numWorkers
        poll(clientPdq,inf)
    end
    ans = 
    "Worker 1 received message!"
    
    ans = 
    "Worker 2 received message!"
    
    ans = 
    "Worker 3 received message!"
    
    ans = 
    "Worker 4 received message!"
    

    各ワーカーで実行される補助関数 analyzeMessage を定義します。この関数は、inQueue キューに対するメッセージのポーリングを実行し、ワーカー番号を抽出します。次に、この関数は確認メッセージを outQueue キューに送り返します。

    function analyzeMessage(inQueue,outQueue)
        message = poll(inQueue,2);
        workerNum = sscanf(message,"Hello, Worker %u");
        send(outQueue,compose("Worker %d received message!",workerNum));
        pause(2)
    end

    PollableDataQueue オブジェクトにメッセージを送信すると、メッセージはキューで待機します。メッセージごとに、キューの長さに 1 が加算されます。poll を使用すると、1 つのメッセージがキューから収集されます。この例では、QueueLength プロパティを使用し、PollableDataQueue オブジェクトの長さを求めて、その長さに Destination 引数がどのような影響を与えるかを確認します。

    まず、1 つのワーカーをもつ並列プールを作成します。

    parpool(1);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 1 workers.
    

    PollableDataQueue オブジェクトを作成します。既定では、parallel.pool.PollableDataQueue 関数は、Destination"creator" に設定して PollableDataQueue オブジェクトを作成します。このタイプの PollableDataQueue オブジェクトでは、キューの作成元のクライアントまたはワーカーにのみ、オブジェクトに対するデータのポーリングを許可します。

    queue = parallel.pool.PollableDataQueue
    queue = 
     PollableDataQueue with properties: 
    
              QueueLength: 0
                 IsClosed: false
    
    

    初期状態では、キューは空です。クライアント上およびワーカー上のキューの長さを確認します。クライアントとワーカーの両方で QueueLength プロパティの値は 0 です。

    fprintf("Queue length on the client: %i\n",queue.QueueLength)
    Queue length on the client: 0
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queue.QueueLength)
    end
    Queue length on the worker: 0
    

    次に、ワーカーからキューにメッセージを送信します。続いて、QueueLength プロパティを使用してキューの長さを求めます。Destination"creator" に設定されると、QueueLength プロパティの値はクライアント (キューの作成元) 上で 1、ワーカー上で 0 です。

    parfor idx = 1
        send(queue,"A message");
    end
    fprintf("Queue length on the client: %i\n",queue.QueueLength)
    Queue length on the client: 1
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queue.QueueLength)
    end
    Queue length on the worker: 0
    

    poll を使用して、キューからメッセージを取得します。

    msg = poll(queue)
    msg = 
    "A message"
    

    キューの長さをもう一度確認します。メッセージを削除したため、現在の QueueLength プロパティの値は 0 です。

    fprintf("Queue length on the client: %i\n",queue.QueueLength)
    Queue length on the client: 0
    

    Destination"any" に設定して PollableDataQueue オブジェクトを作成します。このコマンドは、クライアントまたはプール内の任意のワーカーによるデータ受信のためのポーリングを可能にする PollableDataQueue オブジェクトを作成します。

    queueAny = parallel.pool.PollableDataQueue(Destination="any")
    queueAny = 
     PollableDataQueue with properties: 
    
              QueueLength: 0
                 IsClosed: false
    
    

    このキューにメッセージを送信します。

    parfor idx = 1
        send(queueAny,"Another message");
    end

    キューの長さを確認します。Destination"any" に設定すると、クライアントとワーカーの両方で QueueLength プロパティの値は 1 を示し、クライアントまたはワーカーによるデータ受信のためのキューのポーリングが可能なことを意味しています。

    fprintf("Queue length on the client: %i\n", queueAny.QueueLength);
    Queue length on the client: 1
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queueAny.QueueLength);
    end
    Queue length on the worker: 1
    

    最後に、キューからメッセージを取得して、キューの長さを確認します。キューの処理が完了しているため、QueueLength プロパティの値は 0 です。

    msg = poll(queueAny)
    msg = 
    "Another message"
    
    fprintf("Queue length o the client: %i\n",queueAny.QueueLength);
    Queue length o the client: 0
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queueAny.QueueLength);
    end
    Queue length on the worker: 0
    

    ヒント

    • PollableDataQueue オブジェクトを使用して送信されたデータまたはメッセージは、手動でのデータ取得のみができます。クライアントでデータを受信した後に自動処理するには、代わりに parallel.pool.DataQueue オブジェクトを使用してデータを送信します。

    • R2025a より前: 並列プールのワーカーからクライアントにデータを送り返すには、まずクライアントに PollableDataQueue オブジェクトを作成します。この PollableDataQueue オブジェクトを、parfor ループ、または parfeval などの他の並列言語構成に渡します。ワーカーから send を呼び出して、データをクライアントに送り返します。クライアントで poll を使用して、ワーカーから送信されたメッセージまたはデータの結果を取得します。

    • R2025a より前: クライアントからワーカーにデータを送信するには、ワーカー上にキューを作成し、そのキューをクライアントに送り返します。このワークフローの例については、ワーカーでの通信の受信を参照してください。

    • R2023b より前: ワーカーから別のワーカーにデータを送信することはできません。ワーカー間でデータを転送するには、代わりに spmdspmdSend または spmdReceive を使用してください。

    拡張機能

    すべて展開する

    バージョン履歴

    R2017a で導入

    すべて展開する