Szálak (Threads)

A szálak segítségével több, különálló végrehajtási egységre bonthatunk egy folyamatot.


 

  Rendszerhívások

#include <iostream>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <algorithm>
#include <sys/types.h>
#include <sys/wait.h>

using namespace std;

class PathManagerException{};

class PathManager{
private:
int size = 0;
string *s;

void insertElement(string element){
if(element.at(element.length()-1) != '/') element += '/';
s[size++] = element;
}

public:
PathManager(string path){
path.erase(0, 5);
int pos;
s = new string[count(path.begin(), path.end(), ':') + 1];
while (true) {
pos = path.find(':');
if (pos == -1) {
insertElement(path);
break;
}
insertElement(path.substr(0, pos));
path.erase(0, pos + 1);
}
}

~PathManager(){
delete[] s;
}

int getSize(){
return size;
}

string getElement(int index){
if(index > size-1)
throw PathManagerException();
return s[index];
}
};

int main(int argc, char *argv[], char *envp[]){

if(argc == 1){
cout << "Usage: " << argv[0] << " filename1 filename2 ..." << endl;
return -1;
}

bool isMain = true;
int wtf = 0, duperr;
pid_t pid;

PathManager pm(envp[15]);

char *arr[4];

string p0 = "stat", p1 = "-c \"\%a \%s %y \%n\"";

arr[0] = new char[p0.size()+1];
strcpy(arr[0], p0.c_str());

arr[1] = new char[p1.size()+1];
strcpy(arr[1], p1.c_str());

arr[0][p0.size()] = arr[1][p1.size()] = 0;
arr[3] = NULL;

for(int i = 1; i < argc; ++i){
for(int j = 0; j < pm.getSize(); ++j){

if((pid = fork()) < 0){
perror("fork error");
return -1;
}

if(!pid){
string str = pm.getElement(j) + argv[i];
arr[2] = new char[str.size()+1];
strcpy(arr[2], str.c_str());
arr[2][str.size()] = 0;
isMain = false;
break;
}

}

if(!pid) break;
}

duperr = dup(2);
close(2);

if(isMain) while(wait(&wtf) > 0);
else execv("/usr/bin/stat", arr);

dup2(duperr, 2);
close(duperr);

return 0;

}

  Jelzések

#include <iostream>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include <vector>
#include <fstream>
#include <sys/types.h>

#define ever (;;)

using namespace std;

static void sig_usr1(int);
static void sig_usr2(int);

vector<string> v;

void importData(){
v.clear();
ifstream f;
f.open("input.txt");
string s;
while(f >> s)
v.push_back(s);
f.close();
}

int main(){

if(signal(SIGUSR1, sig_usr1) == SIG_ERR)
perror("Error in signal: SIGUSR2");
if(signal(SIGUSR2, sig_usr2) == SIG_ERR)
perror("Error in signal: SIGUSR1");

cout << "\033[1;33m" << "PID: " << getpid() << endl << endl;

importData();

for ever{
for(vector<string>::iterator it = v.begin(); it != v.end(); ++it){
cout << "\033[1;35m" << *it << ": " << "\033[0m" << endl;
string s = "ps -aux | awk '($11 == \"" + *it + "\"){print $1}' | sort -u";
system(s.c_str()); 
cout << endl;
}
usleep(30000000);
}



return 0;
}

static void sig_usr1(int sig){
cout << "\033[1;36m" << "SIGUSR1 triggered" << "\033[0m" << endl;
importData();
}

static void sig_usr2(int sig){
cout << "\033[1;36m" << "SIGUSR2 triggered" << "\033[0m" << endl;
exit(0);
}

  Osztott memória

  Szerver

#include <iostream>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <csignal>
#include <unistd.h>
#include <set>
#include "common.h"

using namespace std;

int shmid;

void signalHandler(int signum){
    shmctl(shmid, IPC_RMID, 0);
    cout << endl << "Shared memory deleted" << endl;
    exit(signum);
}

int main(){
    cout << "Server pid: " << getpid() << endl;
    signal(SIGINT, signalHandler);

    SHMData shmData;
    SHMData *sharedData;

    key_t key = ftok(".", 'S'); 

    if((shmid = shmget(key, sizeof(SHMData), IPC_CREAT | 0666)) == -1){
        perror("Create SHM has been failed");
        exit(1);
    }

    cout << "Shared memory created" << endl;

    if ((sharedData = (SHMData*)shmat(shmid, NULL, 0)) == (void *) -1) {
        perror("shmat error");
        exit(1);
    }

    *sharedData = SHMData(shmData);

    while(true){
        sleep(15);
        set<pid_t> s;
        for(int i = 0; i < sharedData->getSize(); ++i){
            if(sharedData->get(i).getState()){
                s.insert(sharedData->get(i).getPid());
            }else{
                s.erase(sharedData->get(i).getPid());
            }
        }

        string str = "./script.sh";

        for(set<pid_t>::iterator i = s.begin(); i != s.end(); ++i){
            str += " " + to_string(*i);
        }
        cout << endl;
        system(str.c_str());
    };

    return 0;
}

  Kliens

#include <iostream>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>
#include "common.h"

using namespace std;

int main(){
    cout << "Client pid: " << getpid() << endl;

    SHMData *sharedData;

    int shmid;    
    key_t key = ftok(".", 'S'); 

    if((shmid = shmget(key, sizeof(SHMData), 0)) == -1){
        perror("Read SHM has been failed.");
        exit(1);
    }

    if ((sharedData = (SHMData*)shmat(shmid, NULL, 0)) == (void *) -1) {
        perror("shmat error");
        exit(1);
    }

    sharedData->addPair(pidStatePair(getpid(), true));
    cout << "Start data writed." << endl;
    int a;
    cin >> a;
    sharedData->addPair(pidStatePair(getpid(), false));
    cout << "Start data writed." << endl;

    return 0;
}

  Common

#include <iostream>
#include <sys/types.h>

using namespace std;

class pidStatePair{
    pid_t pid;
    bool state;

public:

    pidStatePair(){}

    pidStatePair(pid_t pid, bool state){
        this->pid = pid;
        this->state= state;
    }

    pid_t getPid(){
        return pid;
    }

    bool getState(){
        return state;
    }
};

class SHMData{
    int size = 0;
    pidStatePair pairs[100];

public:
    void addPair(pidStatePair pair){
        pairs[size++] = pair;
    }

    int getSize(){
        return size;
    }

    pidStatePair get(int index){
        if(index >= size) return pidStatePair();
        else return pairs[index];
    }
};

  Szemaforok

  Szerver

#include <iostream>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <unistd.h>
#include "common.cpp"

#define N 50

using namespace std;

int main(){

    Playground *sharedData;
    key_t shmkey = ftok(".", 'S'), semkey = ftok(".", 'U');
    int shmid, semid;

    if((shmid = shmget(shmkey, sizeof(Playground), IPC_CREAT | 0666)) == -1){
        perror("Create SHM has been failed");
        exit(1);
    }

    cout << "Shared memory created" << endl;

    if ((sharedData = (Playground*)shmat(shmid, NULL, 0)) == (void *) -1) {
        perror("shmat error");
        exit(1);
    }

    *sharedData = Playground(0);
    
    if((semid = semget(semkey, 1, IPC_CREAT | 0666)) < 0){
        perror("semget error");
        exit(1);
    }

    if (semctl(semid, 0, SETVAL, 1) < 0){
        perror("semctl error");
        exit(1);
    }


    while(true){
  
        sleep(3);
        semcall(semid, -1);
        
        int sum = sharedData->getSum();
        cout << getpid() << " critical phase activated. (Server sum check. SUM = " << sum << ")" << endl;
        if(sum > N){
            //sharedData->kiir();
            shmctl(shmid, IPC_RMID, 0);
            cout << endl << "Shared memory deleted" << endl;
            system("ps -A | awk '$4 == \"client\"{print $1}' | xargs kill");
            semcall(semid, 1);
            exit(0);
        }
        cout << getpid() << " critical phase terminated. (Server)" << endl;
        semcall(semid, 1);
    }
}

  Kliens

#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include "common.cpp"

using namespace std;

int main(){

    srand(getpid());

    Playground *sharedData;
    key_t shmkey = ftok(".", 'S'), semkey = ftok(".", 'U');
    int shmid, semid;

    if((shmid = shmget(shmkey, sizeof(Playground), 0)) == -1){
        perror("Read SHM has been failed.");
        exit(1);
    }

    if ((sharedData = (Playground*)shmat(shmid, NULL, 0)) == (void *) -1) {
        perror("shmat error");
        exit(1);
    }

    if((semid = semget(semkey, 0, 0)) < 0){
        perror("semget error");
        exit(1);
    }

    while(true){
        semcall(semid, -1);
        cout << getpid() << " critical phase activated. (Client)" << endl;
        sharedData->play();
        cout << getpid() << " critical phase terminated. (Client)" << endl;
        semcall(semid, 1);
        sleep(1);
    }

    return 0;
}

  Common

#include <iostream>
#include <stdlib.h>
#include <stdio.h>
#include <sys/sem.h>

#define M 50

using namespace std;

class Playground{
    int p, a[M];

public:
    Playground(int p){
        this->p = p;

        for(int i = 0; i < M; ++i)
            a[i] = 0;
    }

    int getSum(){
        int sum = 0;
        for(int i = 0; i < M; ++i)
            sum += a[i];
        return sum;
    }

    void play(){
        a[p]++;
        p = rand() % (M-1) + 1;
    }

    void kiir(){
      for(int i = 0; i < M; ++i){
        cout << i << ": " << a[i] << endl;
      }
    }

};


static void semcall(int semid, int op) {
  struct sembuf pbuf;
  pbuf.sem_num = 0;
  pbuf.sem_op = op;
  pbuf.sem_flg = 0;

  if (semop(semid, &pbuf, 1) < 0){
        perror("semop error");
        exit(1);
  }
}

  Üzenetsorok

  Szerver

#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "common.h"
#include <array>
#include <string.h>
#include <stdio.h>
#include <memory>

#define KEY 953802

using namespace std;

MESSAGE mesp;
int msgid;

void sendMessage(){
    if (msgsnd(msgid, (struct msgbuf *)&mesp, sizeof(mesp)-sizeof(long), 0) < 0){
perror("msgsnd error");
exit(1);
}
}

void execComm() {
    string comm = "last -f wtmp_2016dec | awk '($3 == \"" + (string)(mesp.mtext) + "\"){print $1 \" - \" $3 \" - \" $5 \" \" $6 \" - \" $10}'";
    int parts = 0;
    array<char, 128> buffer;
    unique_ptr<FILE, decltype(&pclose)> pipe(popen(comm.c_str(), "r"), pclose);

    if (!pipe) {
        perror("popen error");
        exit(1);
    }

    mesp.isOver = false;
    while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) {
        strcpy(mesp.mtext, buffer.data());
        sendMessage();
        parts++;
    }

    if(parts == 0){
        strcpy(mesp.mtext, "No result.");
        sendMessage();
        parts++;
    }

    mesp.isOver = true;
    sendMessage();

    cout << "The response was sent in " << parts << " parts to: " << mesp.mtip << endl << endl;
}

int main(){
    cout << "Server pid: " << getpid() << endl;

    if((msgid = msgget(KEY, IPC_CREAT | 0666)) == -1){
        perror("msgget error");
        exit(1);
    }

    cout << "Message created." << endl << endl;

    while(true){
        if (msgrcv(msgid, (struct msgbuf *)&mesp, sizeof(mesp)-sizeof(long), 1L, 0) < 0){
perror("msgrcv error");
exit(1);
}

        cout << "Message received from: " << mesp.pid << endl;
        cout << "type: " << mesp.mtip << endl;
        cout << "text: " << mesp.mtext << endl << endl;

        mesp.mtip = mesp.pid;
        mesp.pid = getpid();

        execComm();
    }

    return 0;
}

  Kliens

#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "common.h"
#include <string.h>

#define KEY 953802

using namespace std;

int main(int argc, char **argv){


    if(argc != 2){
        cout << "Using: " << argv[0] << " <IP>" << endl;
        exit(1);
    }

    cout << "Client pid: " << getpid() << endl;

    int msgid;
    MESSAGE mesp;
    
    if((msgid = msgget(KEY, 0)) == -1){
        perror("msgget error");
        exit(1);
    }

    cout << "Message queue found." << endl << endl;

    int pid = getpid();

    mesp.mtip = 1;
mesp.pid = pid;
    strcpy(mesp.mtext, argv[1]);

    if (msgsnd(msgid, (struct msgbuf *)&mesp, sizeof(mesp)-sizeof(long), 0) < 0){
perror("msgsnd error");
exit(1);
}

    cout << "Message sent." << endl;
    cout << "type: " << mesp.mtip << endl;
    cout << "pid: " << mesp.pid<< endl;
    cout << "text: " << mesp.mtext << endl << endl;

    cout << "Response:" << endl;
while(true){
        if (msgrcv(msgid, (struct msgbuf *)&mesp, sizeof(mesp)-sizeof(long), pid, 0) < 0){
            perror("msgrcv error");
            exit(1);
        }

        if(!mesp.isOver){
            cout << mesp.mtext;
        }else break;
    }
    cout << endl;

    return 0;
}

  Common

typedef struct {
long mtip;
int pid;
bool isOver;
char mtext[256];
} MESSAGE;

  POSIX-Solaris szálak szinkronizálás nélkül

#include <iostream>
#include <pthread.h>
#include <stdlib.h>
#include <memory>
#include <unistd.h>

using namespace std;

template <class T>
class CircularBuffer {
private:
    volatile int maxSize, head = -1, tail = -1;
    volatile T *a;

public:
    ~CircularBuffer(){
        delete[] a;
    }

    void initialize(int maxSize){
        this->maxSize = maxSize;
a = new T[maxSize];
    }

void push(T n) {
if (isFull()) throw runtime_error("Buffer is full.");
if (head == -1) head = tail = 0;
else tail = tail == maxSize - 1 ? 0 : tail + 1;
        a[tail] = n;
}

T pop() {
if (isEmpty()) throw runtime_error("Buffer is empty.");
        int r = head;
        if (head == tail) head = tail = -1;
        else head = head == maxSize - 1 ? 0 : head + 1;
        return a[r];
}

    bool isEmpty(){
        return head == -1;
    }

    bool isFull(){
        return (head == 0 && tail == maxSize - 1) || (head == tail + 1);
    }
};

CircularBuffer<int> cb;
volatile bool isOver = false, lock = false;
volatile int empty_slot = 0, full_slot = 0;

void *produce(void *threadId){
    long id = (long) threadId;

    while(!isOver){
        usleep(rand()%3000000);
        if(!lock){
            lock = true;

            if(empty_slot != 0){
                try{
                    cb.push(1);
                }catch(runtime_error e){
                    cout << "ERROR in producer thread: " << id << " " << e.what() << endl;
                    exit(-1);
                }

                empty_slot--;
                full_slot++;
                cout << "Producer thread " << id << " produced." << endl;
            }

            lock = false;
        }
    }
            
    pthread_exit(NULL);
}

void *consume(void *threadId){
    long id = (long) threadId;

    while(!isOver){
        usleep(rand()%3000000);
        if(!lock){
            lock = true;

            if(full_slot != 0){
                try{
                    cb.pop();
                }catch(runtime_error e){
                    cout << "ERROR in consumer thread: " << id << " " << e.what() << endl;
                    exit(-1);
                }

                empty_slot++;
                full_slot--;
                cout << "Consumer thread " << id << " consumed." << endl;
            }

            lock = false;
        }
    }

    pthread_exit(NULL);
}

int main(int argc, char *argv[]){

    if(argc != 2){
        cout << "Usage: " << argv[0] << " puffer_size" << endl;
        exit(0);
    }

    empty_slot = atoi(argv[1]);
    cb.initialize(empty_slot);

    srand(time(NULL));
    int producerCount = rand()%26+5;
    int consumerCount = producerCount;
    //int consumerCount = rand()%26+5;

    cout << "Producer count: " << producerCount << endl;
    cout << "Consumer count: " << consumerCount << endl << endl;

    pthread_t *producerThreads = new pthread_t[producerCount];
    pthread_t *consumerThreads = new pthread_t[consumerCount];


    for(long i = 0; i < producerCount; ++i){
        cout << "Creating producer thread: " << i << endl;
        if(pthread_create(&producerThreads[i], NULL, produce, (void *)i)){
            perror("Error in thread creating.");
            exit(-1);
        }
    }

    for(long i = 0; i < consumerCount; ++i){
        cout << "Creating consumer thread: " << i << endl;
        if(pthread_create(&consumerThreads[i], NULL, consume, (void *)i)){
            perror("Error in thread creating.");
            exit(-1);
        }
    }

    usleep(10000000);
    isOver = true;

    for(int i = 0; i < producerCount; ++i){
        if(pthread_join(producerThreads[i], NULL)){
            perror("Error in thread joining.");
            exit(-1);
        }
    }

    for(int i = 0; i < consumerCount; ++i){
        if(pthread_join(consumerThreads[i], NULL)){
            perror("Error in thread joining.");
            exit(-1);
        }
    }

    cout << "Every thread has been terminated." << endl;
    
    return 0;
}

  POSIX-Solaris szálak szinkronizálással

#include <iostream>
#include <pthread.h>
#include <stdlib.h>
#include <memory>
#include <unistd.h>

#define THREAD_COUNT 2

using namespace std;

template <class T>
class CircularBuffer {
private:
    int maxSize, head = -1, tail = -1;
    T *a;

public:
    ~CircularBuffer(){
        delete[] a;
    }

    void initialize(int maxSize){
        this->maxSize = maxSize;
a = new T[maxSize];
    }

void push(T n) {
if (isFull()) throw runtime_error("Buffer is full.");
if (head == -1) head = tail = 0;
else tail = tail == maxSize - 1 ? 0 : tail + 1;
        a[tail] = n;
}

T pop() {
if (isEmpty()) throw runtime_error("Buffer is empty.");
        int r = head;
        if (head == tail) head = tail = -1;
        else head = head == maxSize - 1 ? 0 : head + 1;
        return a[r];
}

    bool isEmpty(){
        return head == -1;
    }

    bool isFull(){
        return (head == 0 && tail == maxSize - 1) || (head == tail + 1);
    }
};

CircularBuffer<int> cb;
pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condProduce = PTHREAD_COND_INITIALIZER;
pthread_cond_t condConsume = PTHREAD_COND_INITIALIZER;

void *produce(void *threadId){
    long id = (long) threadId;

    for(int i = 0; i < 20; ++i){
        usleep(rand()%3000000);
        pthread_mutex_lock(&mut);

        while(cb.isFull()){
            cout << "Producer thread " << id << " is waiting." << endl;
            pthread_cond_wait(&condProduce, &mut);
            if(!cb.isFull()) break;
        }

        try{
            cb.push(1);
        }catch(runtime_error e){
            cout << "ERROR in consumer thread: " << id << " " << e.what() << endl;
            exit(-1);
        }

        cout << "Producer thread " << id << " produced." << endl;

        pthread_cond_signal(&condConsume);
        pthread_mutex_unlock(&mut);
    }
            
    pthread_exit(NULL);
}

void *consume(void *threadId){
    long id = (long) threadId;

    for(int i = 0; i < 20; ++i){
        usleep(rand()%3000000);
        pthread_mutex_lock(&mut);

        while(cb.isEmpty()){
            cout << "Consumer thread " << id << " is waiting." << endl;
            pthread_cond_wait(&condConsume, &mut);
            if(!cb.isEmpty()) break;
        }

        try{
            cb.pop();
        }catch(runtime_error e){
            cout << "ERROR in consumer thread: " << id << " " << e.what() << endl;
            exit(-1);
        }

        cout << "Consumer thread " << id << " consumed." << endl;

        pthread_cond_signal(&condProduce);
        pthread_mutex_unlock(&mut);
        
    }

    pthread_exit(NULL);
}

int main(int argc, char *argv[]){

    if(argc != 2){
        cout << "Usage: " << argv[0] << " puffer_size" << endl;
        exit(0);
    }

    cb.initialize(atoi(argv[1]));

    srand(time(NULL));
    int producerCount = rand()%26+5;
    int consumerCount = producerCount;
    //int consumerCount = rand()%26+5;

    cout << "Producer count: " << producerCount << endl;
    cout << "Consumer count: " << consumerCount << endl << endl;

    pthread_t *producerThreads = new pthread_t[producerCount];
    pthread_t *consumerThreads = new pthread_t[consumerCount];


    for(long i = 0; i < producerCount; ++i){
        cout << "Creating producer thread: " << i << endl;
        if(pthread_create(&producerThreads[i], NULL, produce, (void *)i)){
            perror("Error in thread creating.");
            exit(-1);
        }
    }

    for(long i = 0; i < consumerCount; ++i){
        cout << "Creating consumer thread: " << i << endl;
        if(pthread_create(&consumerThreads[i], NULL, consume, (void *)i)){
            perror("Error in thread creating.");
            exit(-1);
        }
    }

    for(int i = 0; i < producerCount; ++i){
        if(pthread_join(producerThreads[i], NULL)){
            perror("Error in thread joining.");
            exit(-1);
        }
    }

    for(int i = 0; i < consumerCount; ++i){
        if(pthread_join(consumerThreads[i], NULL)){
            perror("Error in thread joining.");
            exit(-1);
        }
    }

    cout << "Every thread has been terminated." << endl;
    
    return 0;
}

 


Copyright (C) Buzogány László, 2002

About