D 编程 - 并发


并发性是指一个程序同时在多个线程上运行。并发程序的一个例子是同时响应许多客户端的 Web 服务器。并发性对于消息传递来说很容易,但如果它们基于数据共享,则很难编写。

线程之间传递的数据称为消息。消息可以由任意类型和任意数量的变量组成。每个线程都有一个id,用于指定消息的接收者。任何启动另一个线程的线程都称为新线程的所有者。

在 D 中启动线程

函数spawn()接受一个指针作为参数,并从该函数启动一个新线程。该函数执行的任何操作(包括它可能调用的其他函数)都将在新线程上执行。所有者和工人都开始单独执行,就好像它们是独立的程序一样。

例子

import std.stdio; 
import std.stdio; 
import std.concurrency; 
import core.thread;
  
void worker(int a) { 
   foreach (i; 0 .. 4) { 
      Thread.sleep(1); 
      writeln("Worker Thread ",a + i); 
   } 
}

void main() { 
   foreach (i; 1 .. 4) { 
      Thread.sleep(2); 
      writeln("Main Thread ",i); 
      spawn(≈worker, i * 5); 
   }
   
   writeln("main is done.");  
}

当上面的代码被编译和执行时,它会读取上一节中创建的文件并产生以下结果 -

Main Thread 1 
Worker Thread 5 
Main Thread 2 
Worker Thread 6 
Worker Thread 10 
Main Thread 3 
main is done. 
Worker Thread 7 
Worker Thread 11 
Worker Thread 15 
Worker Thread 8 
Worker Thread 12 
Worker Thread 16 
Worker Thread 13
Worker Thread 17 
Worker Thread 18

D 中的线程标识符

在模块级别全局可用的thisTid变量始终是当前线程的 id。您还可以在调用spawn 时接收threadId。一个例子如下所示。

例子

import std.stdio; 
import std.concurrency;  

void printTid(string tag) { 
   writefln("%s: %s, address: %s", tag, thisTid, &thisTid); 
} 
 
void worker() { 
   printTid("Worker"); 
}
  
void main() { 
   Tid myWorker = spawn(&worker); 
   
   printTid("Owner "); 
   
   writeln(myWorker); 
}

当上面的代码被编译和执行时,它会读取上一节中创建的文件并产生以下结果 -

Owner : Tid(std.concurrency.MessageBox), address: 10C71A59C 
Worker: Tid(std.concurrency.MessageBox), address: 10C71A59C 
Tid(std.concurrency.MessageBox)

D 中的消息传递

函数 send() 发送消息,函数 receiveOnly() 等待特定类型的消息。还有其他名为prioritySend()、receive() 和receiveTimeout() 的函数,稍后将进行解释。

以下程序中的所有者向其工作人员发送一条 int 类型的消息,并等待来自 double 类型的工作人员的消息。线程继续来回发送消息,直到所有者发送一个负整数。一个例子如下所示。

例子

import std.stdio; 
import std.concurrency; 
import core.thread; 
import std.conv;  

void workerFunc(Tid tid) { 
   int value = 0;  
   while (value >= 0) { 
      value = receiveOnly!int(); 
      auto result = to!double(value) * 5; tid.send(result);
   }
} 
 
void main() { 
   Tid worker = spawn(&workerFunc,thisTid); 
    
   foreach (value; 5 .. 10) { 
      worker.send(value); 
      auto result = receiveOnly!double(); 
      writefln("sent: %s, received: %s", value, result); 
   }
   
   worker.send(-1); 
} 

当上面的代码被编译和执行时,它会读取上一节中创建的文件并产生以下结果 -

sent: 5, received: 25 
sent: 6, received: 30 
sent: 7, received: 35 
sent: 8, received: 40 
sent: 9, received: 45 

使用 D 中的 Wait 进行消息传递

下面显示了一个使用等待传递消息的简单示例。

import std.stdio; 
import std.concurrency; 
import core.thread; 
import std.conv; 
 
void workerFunc(Tid tid) { 
   Thread.sleep(dur!("msecs")( 500 ),); 
   tid.send("hello"); 
}
  
void main() { 
   spawn(&workerFunc,thisTid);  
   writeln("Waiting for a message");  
   bool received = false;
   
   while (!received) { 
      received = receiveTimeout(dur!("msecs")( 100 ), (string message) { 
         writeln("received: ", message); 
      });

      if (!received) { 
         writeln("... no message yet"); 
      }
   } 
}

当上面的代码被编译和执行时,它会读取上一节中创建的文件并产生以下结果 -

Waiting for a message 
... no message yet 
... no message yet 
... no message yet 
... no message yet 
received: hello