Cprocess_clients.cc

Go to the documentation of this file.
00001 
00008 extern "C"
00009 {
00010   #include <sys/times.h>
00011 }
00012 
00013 static inline double difftimes(clock_t t1, clock_t t0)
00014 {
00015   // refer "man 2 times"
00016   return static_cast<double>(t1-t0)/static_cast<double>(sysconf(_SC_CLK_TCK));
00017 }
00018 
00019 
00020 void Cprocess_clients::process_data_stream_ecm(const connection_waiter& my_connection_waiter)
00021 {
00022   streampos fpos = Recovery_from_file.tellg();
00023   int Number_of_curves = 0;
00024   string s;
00025 
00026   srand(time(NULL)); // initialize random-seed (time-function should be random enough for our purpose)
00027 
00028   Recovery_from_file >> Number_of_curves;
00029   Recovery_from_file.ignore(1,'\n'); // read over eol
00030 
00031   while (Number_of_curves<elcu_Kurven)
00032   {
00033     unix_io_stream daten(my_connection_waiter); // wait for connection and connect to stream
00034 
00035     daten >> s;
00036 
00037     if (s == "ECM?")
00038      {
00039        int ecm_sigma = rand();
00040        daten << ecm_sigma << " "
00041              << elcu_Phase1 << " "
00042              << elcu_Phase2 << " "
00043              << n << endl; // transmit sigma and n
00044 
00045        Number_of_curves++;
00046        cout_network << "elliptic curve #" << Number_of_curves << "/" << elcu_Kurven
00047             << " with sigma=" << ecm_sigma
00048             << " sent to " << peer_info(daten) << endl;
00049 
00050        // write number of finished curves to recovery-file...
00051        Recovery_to_file.seekp(fpos); 
00052        Recovery_to_file << Number_of_curves << endl << flush;
00053        ecm_curves_processed=Number_of_curves;
00054 
00055        daten << flush;
00056        continue;
00057      }
00058 
00059     if (s == "Faktor(ECM)!") // accept factors
00060       {
00061         cout_network << "Factor received from " << peer_info(daten) << endl;
00062         int sigma;
00063         mpz_t x;
00064         mpz_init(x);
00065         daten >> sigma >> x; // get info about curve (sigma) and get factor
00066         while (isspace(daten.peek())) daten.get();
00067         string ecm_s = "ecm";
00068         if (daten.peek()!=EOF) daten >> ecm_s;
00069         mpz_gcd(x,x,n); // compute gcd (it may happen, that the factor has been already discovered!)
00070         if (mpz_cmp_ui(x,1)!=0)
00071          {
00072            // theoretically the factor can divide multiple times...
00073            const unsigned int exponent = mpz_remove(n,n,x);
00074            if (mpz_probab_prime_p(x,probab_prime_checks))
00075              {
00076                cout << x << " is factor." << endl;
00077                if (mpz_sizeinbase(x,10)<28)
00078                 {
00079                   ostringstream comment;
00080                   comment << " [" << ecm_s << "]";
00081                   Factorization_to_file << MAL(x,exponent,comment) << flush;
00082                 }
00083                else
00084                 {
00085                   ostringstream comment;
00086                   comment << " [" << ecm_s << ",sigma=" << sigma << "]";
00087                   Factorization_to_file << MAL(x,exponent,comment) << flush;
00088                 }
00089              }
00090            else
00091              {
00092                cout << x << " is a composite factor." << endl;
00093                if (mpz_probab_prime_p(n,probab_prime_checks))
00094                 {
00095                   if (exponent>1) mpz_pow_ui(x,x,exponent);
00096                   mpz_swap(n,x);
00097                   cout << x << " is factor. (factorswap)" << endl;
00098                   ostringstream comment;
00099                   comment << " [" << ecm_s << "/factorswap,sigma=" << sigma << "]";
00100                   Factorization_to_file << MAL(x,comment) << flush;
00101                 }
00102                else
00103                 {
00104                   if (mpz_sizeinbase(x,10)<28)
00105                    {
00106                      ostringstream comment;
00107                      comment << " [" << ecm_s << "] [composite]";
00108                      Factorization_to_file << MAL(x,exponent,comment) << flush;
00109                    }
00110                   else
00111                    {
00112                      ostringstream comment;
00113                      comment << " [" << ecm_s << ",sigma=" << sigma << "] [composite]";
00114                      Factorization_to_file << MAL(x,exponent,comment) << flush;
00115                    }
00116                 }
00117              }
00118          }
00119         mpz_clear(x);
00120         daten << flush;
00121 
00122         if (mpz_probab_prime_p(n,probab_prime_checks))
00123           {
00124             Factorization_to_file << MAL(n);
00125             cout << "remaining factor: " << n << endl;
00126             cout << "remaining factor is most probably prime!" << endl;
00127             mpz_set_ui(n,1); // n ist faktorisiert
00128           }
00129 
00130         if ( (mpz_cmp_ui(n,1)==0) || Potenztest(n) ) // factorization complete?
00131           { 
00132             cout << "factorization successfully completed." << endl;
00133             Factorization_to_file << endl;
00134             ExitManager::StopFactorization(); // exit program (success!) and do final work...
00135           }
00136         tune_parameters(mpz_sizeinbase(n,10)); // as the name says: tune parameters
00137 
00138         // adapt changes to the recovery-file!
00139         // number to factorize has lowered!
00140         Recovery_buffer.close(); // close file
00141         Recovery_buffer.open(RecoveryFile.c_str(),ios::in|ios::out|ios::trunc); // re-open it (creating an empty file!)
00142         Recovery_to_file << n << endl; // write out new number to factorize
00143         fpos = Recovery_from_file.tellg(); // mark fileposition
00144         Recovery_to_file << Number_of_curves << endl << flush; // store number of finished curves (ecm)
00145         continue;
00146       }
00147 
00148     if (s == QsieveLogon)
00149      {
00150        // requesting MPQS parameters is not adequate while processing ecm!
00151        cerr << "Inadequate request of MPQS parameters from " << peer_info(daten) << endl;
00152        daten << -1 << " " << "please wait until ecm is processed!" << endl;
00153        continue;
00154      }
00155 
00156     cerr << "Invalid request \"" << s << "\""
00157          << " from " << peer_info(daten) << endl;
00158     daten << flush;
00159   }
00160   cout << "distributed ECM-Phase (server) completed." << endl;
00161 }
00162 
00163 
00164 
00165 // server action, when an invalid seek occurs in one of the combine methods
00166 void Cprocess_clients::seek_emergency_handler(istream &in, const streampos pos)
00167 {
00168   // istream is probably a read-only-stream for DynamicRelations::DynamicRelations_to_file;
00169   // for performance reasons we do not flush DynamicRelations_to_file after each write,
00170   // so it may occur, that this is causing the seek failure...
00171   // we can take appropriate action:
00172 
00173   cerr << "invalid seek position on istream, trying to resolve..." << endl;
00174 
00175   // first we try to get into the critical section to obtain the right to flush the ostream
00176   // if this fails, we wait few seconds and retry (maybe another thread has triggered flushing the ostream)
00177 
00178   in.clear();
00179   CCriticalSection CriticalSection(ServerMutex,false);
00180   if (CriticalSection.try_enter())
00181    {
00182      DynamicRelations::DynamicRelations_to_file.flush();
00183      CriticalSection.leave();
00184      in.seekg(pos); 
00185      if (in.peek()!=EOF) return; // good, problem is most probably resolved!
00186    }
00187   else
00188    {
00189      // since we're in a multithreaded environment, it may happen, that another thread flushes within
00190      // the next few seconds, so let's wait a little bit and retry
00191      sleep(2); in.seekg(pos); 
00192      if (in.peek()!=EOF) return; // good, problem is most probably resolved! (but it may still happen, that data is only written partially)
00193 #if 1
00194      cerr << "invalid seek position on istream not resolved, trying to flush ostream without obtaining a lock" << endl;
00195      // since we have no chance to check whether we're inside (=the owner) of the critical section,
00196      // we flush the DynamicRelations::DynamicRelations_to_file without any locking...
00197      // if we're lucky, this works; if not, the server may crash :-(
00198      // cross fingers and give it a try ;-)
00199      in.clear(); DynamicRelations::DynamicRelations_to_file.flush(); in.seekg(pos);
00200      if (in.peek()!=EOF) return; // good, problem is most probably resolved!
00201 #endif
00202     }
00203   // giving up:
00204   throw ios_base::failure("unresolvable invalid seek position on istream");
00205 }
00206 
00207 CCriticalSection::Mutex Cprocess_clients::ServerMutex;
00208 
00209 void Cprocess_clients::process_data_stream(unix_io_stream &daten, const string client)
00210 {
00211   string s;
00212   daten >> s;
00213 
00214   CCriticalSection CriticalSection(ServerMutex,false);
00215    // this function is a "critical section": since we are in an multithreaded environment,
00216    // we need to have control, who is entering and leaving this!
00217    // Only one thread at a time may be allowed to stay inside the critical section.
00218    // Permission to enter this section is controlled by a mutex mechanism.
00219 
00220   if (s == QsieveLogon)
00221     {
00222       cout_network << "Sending number to factor to " << client << endl;
00223       daten << n << " " << StaticFactorbase::Size() << " " 
00224             << Factor_Threshold << " " << LogicalSieveSize << endl;
00225       return;
00226     }
00227 
00228   if (s == "ECM?")
00229    {
00230      daten << "-1" << endl; // do not accept any elliptic curves requests anymore (-> sigma<0)
00231    }
00232 
00233   if (s == "Faktor(ECM)!") // but accept factors...
00234     {
00235       cout_network << "Factor received from " << client << endl;
00236       int sigma;
00237       mpz_t x;
00238       mpz_init(x);
00239       daten >> sigma >> x; // get info about curve (sigma) and get factor
00240       while (isspace(daten.peek())) daten.get();
00241       string ecm_s = "ecm";
00242       if (daten.peek()!=EOF) daten >> ecm_s;
00243       mpz_gcd(x,x,n); // compute gcd (it may happen, that the factor has been already discovered!)
00244       if (mpz_cmp_ui(x,1)==0) // does the "factor" still divide?
00245        { mpz_clear(x); return; } // "factor" doesn't divide (anymore)
00246 
00247       CriticalSection.enter();
00248 
00249       if (mpz_cmp_ui(x,1)!=0)
00250        {
00251          // theoretically the factor can divide multiple times...
00252          const unsigned int exponent = mpz_remove(n,n,x);
00253          if (mpz_probab_prime_p(x,probab_prime_checks))
00254            {
00255              cout << x << " is factor." << endl;
00256              if (mpz_sizeinbase(x,10)<28)
00257               {
00258                 ostringstream comment;
00259                 comment << " [" << ecm_s << "]";
00260                 Factorization_to_file << MAL(x,exponent,comment) << flush;
00261               }
00262              else
00263               {
00264                 ostringstream comment;
00265                 comment << " [" << ecm_s << ",sigma=" << sigma << "]";
00266                 Factorization_to_file << MAL(x,exponent,comment) << flush;
00267               }
00268            }
00269          else
00270            {
00271              cout << x << " is a composite factor." << endl;
00272              if (mpz_sizeinbase(x,10)<28)
00273               {
00274                 ostringstream comment;
00275                 comment << " [" << ecm_s << "] [composite]";
00276                 Factorization_to_file << MAL(x,exponent,comment) << flush;
00277               }
00278              else
00279               {
00280                 ostringstream comment;
00281                 comment << " [" << ecm_s << ",sigma=" << sigma << "] [composite]";
00282                 Factorization_to_file << MAL(x,exponent,comment) << flush;
00283               }
00284            }
00285        }
00286       mpz_clear(x);
00287 
00288       if (mpz_probab_prime_p(n,probab_prime_checks))
00289         {
00290           Factorization_to_file << MAL(n);
00291           cout << "remaining factor: " << n << endl;
00292           cout << "remaining factor is most probably prime!" << endl;
00293           mpz_set_ui(n,1); // n ist faktorisiert
00294         }
00295 
00296       if ( (mpz_cmp_ui(n,1)==0) || Potenztest(n) ) // factorization complete?
00297         { 
00298           cout << "factorization successfully completed." << endl;
00299           Factorization_to_file << endl;
00300           ExitManager::StopFactorization(); // exit program (success!) and do final work...
00301         }
00302 
00303       /* if program flow is at this position, then:
00304            A client (using ecm) has detected a new factor resulting in a
00305            partial factorization during the sieving phase. This leads to severe
00306            consequences:
00307             - We shouldn't ignore the new factor.
00308             - We cannot continue sieving.
00309             - abort the factorization and delete recovery data
00310               (-> recovery does not fit for the remaining number)
00311             - let the user restart with the remaining number.
00312       */
00313        {
00314           // we abort the factorization and print a remark,
00315           // that the remaining cofactor is still composite...
00316 
00317           Factorization_to_file << MAL(n) << " [composite]" << endl;
00318           cout << "remaining factor: " << n << endl
00319                << "remaining factor is composite!" << endl;
00320           cout << "factorization needs to be stopped to avoid inconsistent files." << endl
00321                << "please restart factorization with remaining number." << endl;
00322 
00323           Factorization_to_file << endl;
00324           ExitManager::StopFactorization(); // exit program (success)
00325         }
00326                  
00327       return;
00328     }
00329 
00330   if (s=="DynamicFactors?_ab_index")
00331     {
00332       // Online-clients should use this one to fetch dynamic factors.
00333       cout_network << "new-style dynamic factor request from " << client << endl;
00334 
00335       int i;
00336       daten >> i;
00337       const int start = i;
00338 
00339       cout_network << "Sending dynamic factors starting with index " << i
00340                    << " to " << client << endl;
00341       for (int j=0; j<20; ++j)
00342        { 
00343          const int i_start = i;
00344          while (i<DynamicFactorRelations.monitoredSize())
00345           {
00346             // binary transmission of data to increase throughput.
00347             // important: transmit data in a defined byteorder to be compatible!
00348             register unsigned int h=DynamicFactorRelations[i];
00349             char c[4];
00350             c[0]=h&0xff; c[1]=(h>>8)&0xff; c[2]=(h>>16)&0xff; c[3]=(h>>24)&0xff;
00351             daten.write(c,4); if (daten.fail()) break;
00352             //daten << DynamicFactorRelations[i] << " ";
00353             ++i;
00354           }
00355          //cerr << " ---- " << client << " ----  j = " << j << " -------" << endl;
00356          pthread_testcancel();
00357          if (i>i_start)
00358           { 
00359             daten.flush();
00360             j=0;
00361           }
00362          if (daten.fail()) break;
00363          sleep(2);
00364          pthread_testcancel();
00365        }
00366       //daten << "-1" << endl;
00367       if (daten.fail()) cout_network << "dynamic factors thread: transmission failure!" << endl;
00368       else
00369        {
00370          daten.put(0); daten.put(0); daten.put(0); daten.put(0); daten.flush();
00371        }
00372       cout_network << "Dynamic factors from " << start << " to " << i-1
00373                    << " sent to " << client << "." << endl;
00374       if (!daten.fail())
00375        {
00376          // read over remaining/redundant data
00377          for(int i=0; i<4; ++i)
00378           if (daten.peek()!=EOF)
00379            {
00380              int h=daten.get();
00381              cerr << "unexpected value (" << h << ") instead of EOF on stream!" << endl;
00382            }
00383        }
00384       cout_network << "Connection to " << client << " closed." << endl;
00385       return;
00386     }
00387   
00388   if (s=="DynamicFactors?_ab_fpos")
00389     {
00390       cout_network << "old-style dynamic factor request from " << client << endl;
00391       // this feature is deprecated!
00392       // Online-clients should use the above one to fetch dynamic factors.
00393       // However, for offline clients and downwards compatibility with
00394       // client-versions up to 2.92 this feature is still provided...
00395 
00396       //streampos startfpos; // this is meant
00397       long long int startfpos; // and this will compile...
00398       daten >> startfpos;
00399 
00400       CriticalSection.enter();
00401 
00402       ostringstream temp_stringstream; // temporary to avoid long process locks
00403       cout_network << "Generating dynamic factors starting with filepos " << startfpos << endl;
00404       const ostream::pos_type saved_fpos = DynamicRelations_to_file.tellp();
00405       DynamicRelations_to_file.seekp(0,ios::end);
00406       const ostream::pos_type fpos_to_transmit = DynamicRelations_to_file.tellp(); // use variable, because this is safer for exception handling
00407       DynamicRelations_to_file.seekp(saved_fpos);
00408 
00409 #ifdef STL_STREAM_workaround
00410       daten << static_cast<streamoff>(fpos_to_transmit) << " ";
00411 #else
00412       daten << fpos_to_transmit << " ";
00413 #endif
00414       
00415       {
00416         CUnlockMutexAtDestruction SLP_Lock(DynamicFactorRelations.SLP_mutex);
00417         DynamicFactorRelations.SLP_mutex.lock();
00418         // this section can only proceed, if DynamicFactorRelations are locked!!
00419         // -> you should really try to avoid using deprecated protocol feature,
00420         //    as it is ugly slow!!   
00421 
00422         int prev=0; // transmit difference between numbers instead numbers theirselves
00423         for (TDynamicFactorRelations::const_iterator pos=DynamicFactorRelations.begin();
00424              pos != DynamicFactorRelations.end() && pos->sieveable(); ++pos)
00425 #ifdef STL_STREAM_workaround
00426           if (static_cast<streamoff>((*pos).fpos)>=startfpos)
00427 #else
00428           if ((*pos).fpos>=startfpos)
00429 #endif
00430             {
00431               temp_stringstream << (*pos).factor-prev << " ";
00432               prev=(*pos).factor;
00433             }
00434       }
00435 
00436       const unsigned int KB = temp_stringstream.str().length()/1024;
00437       cout_network << "Sending dynamic factors starting with fileposition " << startfpos
00438                    << " (" << KB << "KB)" <<endl;
00439 
00440       CriticalSection.leave(); // allow other threads to sneak in
00441 
00442       if (!temp_stringstream.str().empty()) daten << temp_stringstream.str();
00443       if (daten.fail()) cout_network << "dynamic factors thread (old-style): transmission failure!" << endl;
00444       else daten << "-1" << endl;
00445 
00446       cout_network << "Connection (old-style) to " << client << " closed." << endl;
00447       return;
00448     }
00449   
00450   if (s=="Polynom?")
00451     {
00452       static CCriticalSection::Mutex myPolynomMutex;
00453       int intervall;
00454       daten >> intervall;
00455       cout_network << "Sending new MPQS interval (" << intervall << ")"
00456                    << " to " << client << endl;
00457 
00458       CCriticalSection CriticalPolynomSection(myPolynomMutex);
00459       // the critical section for serving MPQS polynomial data starts here...
00460 
00461       Polynom.save(daten); // Intervallanfang
00462       Polynom.compute_next_polynomial(intervall);
00463       Polynom.save(daten); // end of interval
00464       
00465       // for recovery:
00466       streampos fpos = Recovery_to_file.tellp();
00467       Polynom.save(Recovery_to_file); // save current mpqs polynomial
00468       Recovery_to_file.flush(); // be cautious...
00469       Recovery_to_file.seekp(fpos);
00470       return;
00471     }
00472 
00473   // LAZY-Variante (ersetzt die nunmehr obsolete BUSY-Variante,
00474   // die bis einschließlich Version 2.92 als Option noch implementiert war):
00475   // Wenn die empfangenen Relationen Korrekturfaktoren enthalten, werden
00476   // diese erst dann ausgewertet, wenn eine statische Relation entsteht.
00477   // Diese Variante ist insbesondere für das Client-Server-Modell
00478   // implementiert, da das relativ einfach ist und der Server alle
00479   // Relationen schnell entgegennehmen soll, was in der bisherigen
00480   // BUSY-Variante bei größeren Gleichungssystemen nicht mehr gewährleistet
00481   // werden konnte. Außerdem dürfte die LAZY-Variante Plattenplatz sparen.
00482   // Nachteil: Der Check auf defektierende Relationen wird nun
00483   // unverhältnismäßig aufwendig. (Per Induktion gilt zwar: Wenn jede Relation
00484   // getestet wird, dann reicht es, alle Faktoren (einschließlich der 
00485   // Korrekturfaktoren) und den quadrierten Restfaktor zusammenzumultiplizieren
00486   // und das Produkt modulo n auf 1 zu testen. Die Korrektheit der zu den
00487   // Korrekturfaktoren gehörigen Relationen ergibt sich dann durch einen
00488   // Existenzcheck der zugehörigen Relation (und der Induktionsannahme, dass
00489   // diese korrekt ist). Dieser Test ist also einfach zu implementieren, aber
00490   // während in der Busy-Variante der Test nicht allzu sehr ins Gewicht fällt,
00491   // nimmt er hier einen großen Teil der Verarbeitungszeit ein, da die
00492   // Relation nun nicht mehr als Zeichenkette verarbeitet werden kann, sondern
00493   // tatsächlich ausgewertet werden muß. Gerade aber die Auswertung sollte
00494   // durch die LAZY-Methode auf das notwendige Maß beschränkt werden.
00495   //
00496   // policy for LAZY-variant:
00497   //  1. receive relation as a string of characters
00498   //  2. analyse relation (determine factor.Type)
00499   //  3. if static relation -> proceed as normal (BUSY)
00500   //  4. call analysis procedures (as normal)
00501   //  5. save string to the appropriate file
00502   if (s == "NewRelations!")
00503     {
00504       daten.set_TCP_NODELAY(); // for faster communication
00505       string ClientAccount = client.substr(0,client.find_first_of(' ')); // default account name for statistical accounting
00506 
00507       {
00508         // verify, that relations belong to our factorization
00509         mpz_t received_kN;
00510         mpz_init(received_kN);
00511         daten >> received_kN;
00512         if (daten.fail() || mpz_cmp(received_kN,kN)!=0)
00513          {
00514            daten.clear(daten.rdstate() & ~std::ios::failbit);
00515            daten << "VOID_number!!" << endl;
00516            return;
00517          }
00518         else daten << "proceed" << endl;
00519         mpz_clear(received_kN);
00520 
00521         daten >> s;
00522         if (s=="Account:")
00523          {
00524            // clients want to register with an alias account name
00525            s=read_restrictedName(daten); // needed for security reasons to prevent escape sequences or similar bad things!
00526            if (s!="")
00527             {
00528               ClientAccount=s; // okay, use this alias
00529 #ifdef VERBOSE_NOTICE
00530               cout << client << " registered as \"" << ClientAccount << "\"" << endl;
00531 #endif
00532             }
00533            daten >> s; // and now we expect to read keyword for getting a relation
00534          }
00535         if (s!="RL") // new token for "Relation!"
00536          {
00537            cerr << "expected block of relations, but got nonsense!" << endl;
00538            return;
00539          }
00540       }
00541 
00542       bool Relationsblock_okay = true;
00543       int received_relations_counter = 0;
00544       
00545       statistical_data::AllClientStats_Mutex.lock();
00546       statistical_data::CClientStats &ClientStats
00547        = statistical_data::AllClientStats[ClientAccount];
00548       statistical_data::AllClientStats_Mutex.unlock();
00549       // generate (or access) and reference a statistical "worksheet" for
00550       // the specified client (which is given by a string)
00551 
00552       /* Was passiert hier? (auf deutsch)
00553 
00554          "AllClientStats" ist eine Map, d.h. sie verhält sich wie ein Array, auf das
00555          mit beliebigen Indices eines vorgegebenen Typs zugegriffen werden kann. In unserem
00556          Fall ist der Index von Typ "String". Wenn unter dem Index ein Eintrag gefunden wurde,
00557          so wird eine Referenz auf diesen zurückgeliefert. Wenn kein Eintrag gefunden wurde,
00558          so wird automatisch ein neuer Eintrag angelegt und ebenfalls eine Referenz davon
00559          zurückgegeben. -- Diesen Eintrag machen wir nun für den Rest dieser Funktion unter der
00560          handlicheren Referenz "&ClientStats" zugänglich.
00561 
00562          Da die Ermittlung des Eintrags ein kritischer Abschnitt ist (nicht threadsafe!), muß er
00563          durch Mutex vor konkurrierenden Zugriffen anderer Threads geschützt werden.
00564          Der Zugriff auf "&ClientStats" ist ebenfalls (innerhalb der Methoden der Klasse CClientStats)
00565          vor konkurrierenden Zugriffen geschützt.
00566 
00567          Zugriffsbeispiele:
00568 
00569           Szenario 1:
00570               Ein client mit dem Verbindungsnamen <ip> <port> "127.0.0.1 1000"
00571               hat sich erstmals an den Server verbunden.
00572            -> Beim Zugriff auf AllClientStats["127.0.0.1"] wird automatisch
00573               eine neue Instanz vom Typ CClientStats erzeugt.
00574               Diese wird für den weiteren Gebrauch innerhalb dieser Funktion
00575               unter "&ClientStats" verfügbar gemacht.
00576 
00577           Szenario 2:
00578               Ein anderer client mit dem Verbindungsnamen "myname.test.org 9181" hat sich
00579               bereits zum zweiten Mal connected.
00580            -> Der Zugriff findet über AllClientStats["myname.test.org"] statt
00581               und die bereits existierende Instanz wird für den weiteren
00582               Gebrauch innerhalb dieser Funktion unter "&ClientStats"
00583               verfügbar gemacht.
00584       */
00585 
00586       // additional remark:
00587       // concurrent access is sequenced by a MUTEX inside CClientStats!
00588 
00589       ClientStats.PutTimeStamp(); // put time stamp on for this connection
00590 
00591       do // receive a block of relations
00592         {
00593           CriticalSection.leave();
00594           // allow other threads to sneak in, because this
00595           // loop can take really a long time if we would do it atomically!
00596 
00597           //cout << "receiving relation from client." << endl;
00598 
00599           ostringstream os; // read relation into this string-stream for further processing
00600 
00601           //cout << "scanning relation" << endl;
00602           daten >> s; // starting symbol "G"
00603           if (s != "G")
00604            {
00605              MARK;
00606              cerr << "error: relation start symbol 'G' expected!" << endl;
00607 #ifdef SAFEMODE
00608              return; // abort this connection...
00609 #else
00610              exit(1); // since we are in an trusted environment, something weired must have happened!
00611 #endif
00612            }
00613 
00614           CmpqsFactor factor;
00615           daten >> factor; // read factor (dynamic-factor or special-factor of relation or 1 for static relation)
00616           os << "G " << setprecision(20) << factor; // << " ";
00617           while (daten.peek()!='\n')
00618            {
00619              char line[1024];
00620              daten.get(line,sizeof(line),'\n'); os << line;
00621              if (daten.fail())
00622               {
00623                 cerr << "stream in failstate for " << client << ", aborting thread." << endl;
00624                 return;
00625               } 
00626            }
00627           { char c; daten.get(c); os << c; }
00628           //cout << os.str();
00629       
00630           //daten.ignore(1,'\n');
00631           daten >> s;
00632 
00633           while (s=="Relationsblock_Sync")
00634            {
00635              // client wishes to sync with server
00636 #ifdef VERBOSE_INFO
00637              cout << "syncing with " << client << ", " << received_relations_counter << " received so far." << endl;
00638 #endif
00639              daten << "synced. " << flush;
00640              //cout << "synced with " << client << endl;
00641              daten >> s;
00642              if (daten.fail()) return;
00643              ClientStats.PutTimeStamp(); // put time stamp on for this connection
00644              CriticalSection.enter(); StatusReport(); CriticalSection.leave();
00645            }
00646 
00647           received_relations_counter++;
00648 
00649 #if 0
00650           // this version is less verbose
00651           CriticalSection.enter(); // still each step should be processed atomically...
00652           // okay, now we are inside the critical section
00653 #else
00654           // this version is more verbose; use it to detect lifelocks
00655 
00656           // [
00657           //   Assume you are at the crossing of two streets, then a
00658           //   DEADLOCK is a situation, where at each of the four
00659           //   entranceways there is a car waiting to enter the crossing.
00660           //
00661           //   A LIFELOCK is a different situation, where you want to cross
00662           //   a street, but there is so much traffic, that you cannot
00663           //   actually do it.
00664           //   
00665           //   Now assume a LIFELOCK, which is caused by people who are
00666           //   searching for a parking place. But you cannot leave your
00667           //   parking place because of the traffic generated by this
00668           //   LIFELOCK. Actually this can lead to a situation, that is very
00669           //   similar to a DEADLOCK, because the traffic gets jammed.
00670           //
00671           //   Well, DEADLOCKS and LIFELOCKS can be avoided by design in
00672           //   multithreaded programs. But you might still be interested to
00673           //   measure the traffic for detecting design flaws...
00674           // ]
00675 
00676           // still each step should be processed atomically...
00677           if (!CriticalSection.try_enter())
00678            {
00679              clock_t Start = times(NULL);
00680              CriticalSection.enter();
00681              clock_t Stop = times(NULL); double w=difftimes(Stop,Start);
00682              if (w>0.04) // print any client that got suspended for more than 0.04 seconds
00683                cout << "thread " <<  pthread_self() << ", " << client
00684                     << " suspended for " << difftimes(Stop,Start) << " seconds." << endl;
00685            }
00686           // okay, critical section entered
00687 #endif
00688 
00689 
00690 #ifdef SAFEMODE
00691           {
00692             // now check, whether received relation is valid
00693             istringstream is(os.str()); // provide read string as input
00694             if (!CRelation::is_valid(is))
00695              {
00696             #ifdef VERBOSE_WARN
00697                MARK;
00698                cerr << "invalid relation received from " << client << endl;
00699             #endif
00700                Relationsblock_okay=false;
00701                continue; // or should we break?
00702              }
00703             else
00704              {
00705             #ifdef VERBOSE_INFO
00706                cout << "valid relation received from " << client << endl;
00707             #endif
00708              }
00709           }
00710 #endif
00711           
00712           ClientStats.increment(factor.Type()); // increment statistical counter for this type of factor
00713           if (factor.IsTypeOf(CmpqsFactor::static_prime))
00714            {
00715              // Static Factor
00716              //cout << "Received static relation!" << endl;
00717              statistical_data::DynamicFactorRating.increment_Hits_at_position(0); // 0 dynamic factors directly involved (serversided)
00718              CriticalSection.leave();
00719 
00720              istringstream is(os.str()); // provide read string as input
00721              CRelation* GL = new CRelation();
00722              GL->set_MulticombineData(new CRelation::SMulticombineData);
00723              GL->multi_combine_init();
00724              factor = GL->multi_combine_main(is);
00725              //cout << "inserting relation into system of equations." << endl;
00726              StaticRelations::insert(GL,without_multi_combine_init);
00727              continue;
00728            }
00729           if (factor.IsTypeOf(CmpqsFactor::single_large_prime))
00730            {
00731             // Dynamic Factor
00732             TDynamicFactorRelation relation;
00733             relation.factor = factor.int_value();
00734             if (!is_dynamic_factor(relation)) // known dynamic factor? -> if so, then fill in the missing data 
00735               { // a new single large prime!
00736                 relation.fpos=DynamicRelations_to_file.tellp();
00737                 DynamicRelations_to_file << os.str(); // fast save, because the string is written without further conversion
00738                   // IMPORTANT:
00739                   // Normally we need to flush DynamicRelations_to_file here to ensure, that
00740                   // any istream pointing to the same file is up-to-date (because of possible
00741                   // correction factors ["Korrekturfaktoren"]). But this would slow down performance!
00742                   // (Flushing is only relevant, iff unflushed correction factors occur in static relations,
00743                   // and compared to the number of incoming SLP relations such events are really seldom...)
00744                   // To ensure integrity, we do the following: For the combine routines, we catch failing
00745                   // seeks and do error handling... 
00746                 // relation.append_for_sieving(); // server need not to do this!
00747                 DynamicFactorRelations.insert(relation);
00748                 SpecialRelations::split_by_primefactor(relation.factor);
00749               } 
00750             else
00751               { // factor already known!
00752                 //cout << "Can convert dynamic relation to a static relation!" << endl;
00753                 statistical_data::DynamicFactorRating.increment_Hits_at_position(1);
00754                 CriticalSection.leave();
00755 
00756                 istringstream is(os.str()); // provide read string as input
00757                 CRelation* GL = new CRelation();
00758                 GL->set_MulticombineData(new CRelation::SMulticombineData);
00759                 GL->multi_combine_init();
00760                 factor = GL->multi_combine_main(is);
00761                 {
00762                   mpz_t x; mpz_init(x);
00763                   factor.assign_to_mpz(x); // store factor into mpz-variable x
00764                   mpz_mul(GL->Delta,GL->Delta,x); mpz_mod(GL->Delta,GL->Delta,n);
00765                   mpz_clear(x);
00766                 }
00767                 istream *pis = DynamicRelations::IstreamPool::acquire_istream();
00768                 GL->multi_combine_main(*pis, relation.fpos);
00769                 DynamicRelations::IstreamPool::release_istream(pis);
00770                 //GL->multi_combine_exit();
00771                 StaticRelations::insert(GL,without_multi_combine_init);
00772               }
00773             continue;
00774            }
00775           if (factor.IsTypeOf(CmpqsFactor::double_large_prime))
00776            {
00777              // Special-Factor
00778 
00779              // Specialfactor mit Dynamicfactoren testen:
00780              /* Anmerkung:
00781                  Beim verteilten Sieben kann es passieren, dass ein (notwendigerweise zusammengesetzter)
00782                  Special-Factor von einem Client entdeckt wird, der nicht mit dem aktuellen Satz der
00783                  dynamischen Faktoren siebt. - In diesem Fall, der mit steigender Anzahl der Clients immer
00784                  wahrscheinlicher wird, könnte es sinnvoll sein, Specialfaktoren auf ihre Teilbarkeit
00785                  mit den vorliegenden dynamischen Faktoren des Servers zu testen und ggf. zu splitten.
00786                  Allerdings bringt ein Verzicht auf diesen Test keine weiteren Nachteile mit sich.
00787                  Wir haben daher auf diese - nur beim verteilten Sieben mögliche - Prozedur bisher verzichtet.
00788 
00789                  Nachtrag 2004-05-02:
00790                  Ist jetzt wieder aktuell geworden, da ich eine Option eingebaut habe,
00791                  nur bis zu einer Obergrenze der dynamischen Faktoren zu sieben. Der Rest der dynamischen
00792                  Faktoren verhält sich also passiv und könnte durchaus im Special-Factor enthalten sein,
00793                  wenn der Client ihn nicht bereits entfernt und die Relation als Single-Large-Prime-Relation
00794                  klassifiziert hat. Und das kann durchaus passieren, wenn der Client nicht im Besitz der vollen
00795                  Menge der SingleLargePrimes ist!
00796                  Wir machen es jetzt auf die unelegante Tour mit einer vollen Breitseite:
00797                   -> zuerst wird die Relation als Special-Relation aufgenommen
00798                   -> Dann werden ihre beiden Teilfaktoren auf mögliche SingleLargePrimes überprüft,
00799                      und (sollte das der Fall sein):
00800                       -> SpecialRelations::split_by_primefactor() aufgerufen.
00801               */
00802              SpecialRelations::insert(factor,os.str());
00803              if (is_dynamic_factor(factor.LP1())) SpecialRelations::split_by_primefactor(factor.LP1());
00804              if (is_dynamic_factor(factor.LP2())) SpecialRelations::split_by_primefactor(factor.LP2());
00805              continue;
00806            }
00807 
00808          cerr << "Error: Trash or a relation of unknown type has been received!" << endl;
00809          Relationsblock_okay=false;
00810         } while (s=="RL"); // new token for "Relation!"
00811       // block of relations has been read
00812       CriticalSection.leave();
00813       cout_network << "Block of " << received_relations_counter 
00814                    << " relations received from " << client << "." << endl;
00815       if (!Relationsblock_okay) daten << "ignoriert!" << flush;
00816       else
00817        if (s=="Relationsblock_Ende") daten << "empfangen." << flush;
00818        else
00819         {
00820           cerr << "could not detect (expected) end of block of relations!" << endl;
00821           daten << "Relationsblock_korrekt_beendet???" << flush;
00822         }
00823       CriticalSection.enter(); StatusReport(); CriticalSection.leave();
00824       return;
00825     }
00826 
00827   cerr << "Invalid request \"" << s << "\""
00828        << " from " << client << endl;
00829 }
00830 
00831 
00832 void* Cprocess_clients::THREAD_process_data_stream(void* arg)
00833 {
00834   ExitManager::register_cancel();
00835   const int communication_socket = *static_cast<int*>(arg);
00836   delete static_cast<int*>(arg);
00837 
00838   unix_io_stream *communication_stream = NULL;
00839 
00840   try
00841    {
00842      communication_stream = new unix_io_stream(communication_socket);
00843    }
00844   catch (exception &e)
00845    {
00846      cerr << "while constructing caught an exception: " << e.what() << endl;
00847      goto done;
00848    }
00849   catch (...)
00850    {
00851      cerr << "while constructing caught an unknown exception!" << endl;
00852      throw;
00853    }
00854 
00855   //cout << "processing thread..." << endl;
00856 
00857   try
00858    {
00859      process_data_stream(*communication_stream,peer_info(*communication_stream));
00860    }
00861   catch (exception &e)
00862    {
00863      cerr << "Cprocess_clients::THREAD_process_data_stream: caught an exception: " << e.what() << endl;
00864      goto done;
00865    }
00866   catch (...)
00867    {
00868      cerr << "Cprocess_clients::THREAD_process_data_stream: while trying to process data stream caught an unknown exception!" << endl;
00869      throw;
00870    }
00871 
00872   //cout << "... thread processed!" << endl;
00873 
00874   try
00875    {
00876      if (!communication_stream->fail()) communication_stream->flush();
00877    }
00878   catch (exception &e)
00879    {
00880      cerr << "caught an exception: " << e.what() << endl;
00881    }
00882   catch (...)
00883    {
00884      cerr << "caught an unknown exception!" << endl;
00885      throw;
00886    }
00887 
00888 done:
00889 
00890   try
00891    {
00892      delete communication_stream;
00893    }
00894   catch (exception &e)
00895    {
00896      cerr << "while destructing caught an exception: " << e.what() << endl;
00897    }
00898   catch (...)
00899    {
00900      cerr << "while destructing caught an unknown exception!" << endl;
00901      throw;
00902    }
00903 
00904   ExitManager::unregister_cancel();
00905   return 0;
00906 }

Generated on Wed Nov 7 23:29:25 2007 for Qsieve by  doxygen 1.5.4