Garmaine Staff asked 2 years ago

Another try with getting parallel processes to work. Please excuse the amount of code but every attempt to shorten it makes the error vanish.

What I tested so far:

  • sending int from parent to child, from child to parent, and from parent to child and then back: works
  • processing a list of int: send from parent to child, modify and back to parent: works
  • more data: int + string, from parent to child, modify and back to parent: works
  • a list of data the same way: works

But when I run the same function that works a second time it always fail.

This is the function that creates the child process:

//parent sends binary data from list to child which sends back modified data
bool processParallel6(std::vector<std::pair<int, std::string>> & data)
{
    //define pipe
    int parent2Child[2];
    int child2Parent[2];

    //create pipe
    pipe(parent2Child);
    pipe(child2Parent);

    //fork
    pid_t child = fork();

    if(child == 0) //child process
    {
        //close not needed end of pipe
        close(parent2Child[1]);
        close(child2Parent[0]);

        for(;;)
        {
            struct pollfd pfd;
            pfd.fd = parent2Child[0];
            pfd.events = POLLIN;

            //wait until data is available at the pipe
            cout << "c: poll ..." << endl;
            if(poll(&pfd, 1, -1) < 0)
            {
                cout << "c: poll: " << strerror(errno) << endl;
                exit(-1);
            }
            cout << "c: poll says there are data" << endl;

            if((pfd.revents&POLLIN) == POLLIN)
            {
                int data;
                std::string text;
                if(!readData3(parent2Child[0], data, text))
                    exit(-2);
                cout << "c: data received: " << data << " " << text <<  endl;

                if(data == -1)
                    break;

                if(!writeData3(child2Parent[1], data * 2, text + text))
                    exit(-3);
                cout << "c: sent data to parent: " << 2 * data << " " << text + text << endl;
            }
        }

        close(parent2Child[0]);
        close(child2Parent[1]);
        exit(0);
    }
    else //parent process
    {
        //close not needed end of pipe
        close(parent2Child[0]);
        close(child2Parent[1]);

        //send data to child
        if(!writeData3(parent2Child[1], data.back().first, data.back().second))
            return false;
        cout << "p: wrote data: " << data.back().first << " " << data.back().second << endl;
        data.pop_back();

        //read result from child
        for(;;)
        {
            struct pollfd pfd;
            pfd.fd = child2Parent[0];
            pfd.events = POLLIN;

            //wait until data is available at the pipe
            cout << "p: poll ..." << endl;
            if(poll(&pfd, 1, -1) < 0)
            {
                cout << "p poll: " << strerror(errno) << endl;
                return false;
            }
            cout << "p: poll says there are data" << endl;

            if((pfd.revents&POLLIN) == POLLIN)
            {
                int data;
                std::string text;
                if(!readData3(child2Parent[0], data, text))
                    return false;
                cout << "p: data received: " << data << " " << text << endl;
            }

            if(data.empty())
                break;

            if(!writeData3(parent2Child[1], data.back().first, data.back().second))
                return false;
            cout << "p: wrote data: " << data.back().first << " " << data.back().second << endl;
            data.pop_back();
        }

        //send stop data
        if(!writeData3(parent2Child[1], -1, "notext"))
            return false;
        cout << "p: sent stop data " << endl;

        //wait for child to end
        wait(nullptr);

        //close all pipes
        close(parent2Child[1]);
        close(child2Parent[0]);
    }

    return true;
}

For reading and writing data I use this two functions:

bool readData3(int fd, int & number, std::string & text)
{
    char numberBuf[sizeof(int)];
    int bytesRead = read(fd, numberBuf, sizeof(int));
    if(bytesRead > 0)
    {
        number = *(int *)numberBuf;
    }
    else if(bytesRead < 0)
    {
        cout << "readData3: " << strerror(errno) << endl;
        return false;
    }

    char sizeBuf[sizeof(int)];
    int size = -1;
    bytesRead = read(fd, sizeBuf, sizeof(int));
    if(bytesRead > 0)
    {
        size = *(int *)sizeBuf;
    }
    else if(bytesRead < 0)
    {
        cout << "readData3: " << strerror(errno) << endl;
        return false;
    }

    char textBuf[size];
    bytesRead = read(fd, textBuf, size);
    if(bytesRead > 0)
    {
        text = std::string(textBuf);
    }
    else if(bytesRead < 0)
    {
        cout << "readData3: " << strerror(errno) << endl;
        return false;
    }
    return true;
}

bool writeData3(int fd, const int number, const std::string text)
{
    int bytesWritten = write(fd, &number, sizeof(int));
    if(bytesWritten < 0)
    {
        cout << "writeData3: " << strerror(errno) << endl;
        return false;
    }

    int size = text.size() + 1;
    bytesWritten = write(fd, &size, sizeof(int));
    if(bytesWritten < 0)
    {
        cout << "writeData3: " << strerror(errno) << endl;
        return false;
    }

    bytesWritten = write(fd, text.c_str(), size);
    if(bytesWritten < 0)
    {
        cout << "writeData3: " << strerror(errno) << endl;
        return false;
    }

    return true;
}

Finally I run it like this:

#include <iostream>
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <bitset>
#include <memory>
#include <poll.h>
#include <cstring>

using namespace std;

int main(int /*argc*/, char* /*argv*/[])
{
    std::vector<std::pair<int, std::string>> data;
    data.push_back(std::make_pair(1, "one"));
    data.push_back(std::make_pair(2, "two"));
    cout << "6a ########################################################" << endl << flush;
    processParallel6(data);
    cout << "6b ########################################################" << endl << flush;
    processParallel6(data);

    return 0;
}

This is the output:

6a ###############################################
p: wrote data: 2 two
p: poll ...
c: poll ...
c: poll says there are data
c: data received: 2 two
p: poll says there are data
p: data received: 4 twotwo
p: wrote data: 1 one
p: poll ...
c: sent data to parent: 4 twotwo
c: poll ...
c: poll says there are data
c: data received: 1 one
p: poll says there are data
p: data received: 2 oneone
p: sent stop data 
c: sent data to parent: 2 oneone
c: poll ...
c: poll says there are data
c: data received: -1 notext
6b ###################################################
c: poll ...
terminate called after throwing an instance of 'std::logic_error'
  what():  basic_string::_M_construct null not valid
c: poll says there are data
c: poll ...
c: poll says there are data
c: poll ...

The last 4 lines are repeated a thousands of times. This output comes most of the times, but sometimes I have seen a std::bad_alloc error. When I try strace it crashes too, but when it runs I have seen directly after the second run of processParallel6() a line with mmap, ENOEM and 'Cannot allocate memory'

What happens here? Why is it working the first time, but not the second time?