00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "rsync.h"
00038
00039
00040 #define SELECT_TIMEOUT 60
00041
00042 static int io_multiplexing_out;
00043 static int io_multiplexing_in;
00044 static int multiplex_in_fd;
00045 static int multiplex_out_fd;
00046 static time_t last_io;
00047 static int no_flush;
00048
00049 extern int bwlimit;
00050 extern int verbose;
00051 extern int io_timeout;
00052 extern struct stats stats;
00053
00054
00055 const char phase_unknown[] = "unknown";
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070 const char *io_write_phase = phase_unknown;
00071 const char *io_read_phase = phase_unknown;
00072
00073
00074
00075 int kludge_around_eof = False;
00076
00077
00078 static int io_error_fd = -1;
00079
00080 static void read_loop(int fd, char *buf, size_t len);
00081
00082 static void check_timeout(void)
00083 {
00084 extern int am_server, am_daemon;
00085 time_t t;
00086
00087 err_list_push();
00088
00089 if (!io_timeout) return;
00090
00091 if (!last_io) {
00092 last_io = time(NULL);
00093 return;
00094 }
00095
00096 t = time(NULL);
00097
00098 if (last_io && io_timeout && (t-last_io) >= io_timeout) {
00099 if (!am_server && !am_daemon) {
00100 rprintf(FERROR,"io timeout after %d seconds - exiting\n",
00101 (int)(t-last_io));
00102 }
00103 exit_cleanup(RERR_TIMEOUT);
00104 }
00105 }
00106
00107
00108 void io_set_error_fd(int fd)
00109 {
00110 io_error_fd = fd;
00111 }
00112
00113
00114 static void read_error_fd(void)
00115 {
00116 char buf[200];
00117 size_t n;
00118 int fd = io_error_fd;
00119 int tag, len;
00120
00121
00122
00123 io_error_fd = -1;
00124
00125 read_loop(fd, buf, 4);
00126 tag = IVAL(buf, 0);
00127
00128 len = tag & 0xFFFFFF;
00129 tag = tag >> 24;
00130 tag -= MPLEX_BASE;
00131
00132 while (len) {
00133 n = len;
00134 if (n > (sizeof(buf)-1))
00135 n = sizeof(buf)-1;
00136 read_loop(fd, buf, n);
00137 rwrite((enum logcode)tag, buf, n);
00138 len -= n;
00139 }
00140
00141 io_error_fd = fd;
00142 }
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155 static void whine_about_eof (void)
00156 {
00157 if (kludge_around_eof)
00158 exit_cleanup (0);
00159 else {
00160 rprintf (FERROR,
00161 "%s: connection unexpectedly closed "
00162 "(%.0f bytes read so far)\n",
00163 RSYNC_NAME, (double)stats.total_read);
00164
00165 exit_cleanup (RERR_STREAMIO);
00166 }
00167 }
00168
00169
00170 static void die_from_readerr (int err)
00171 {
00172
00173 io_multiplexing_close();
00174
00175 rprintf(FERROR, "%s: read error: %s\n",
00176 RSYNC_NAME, strerror (err));
00177 exit_cleanup(RERR_STREAMIO);
00178 }
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192 static int read_timeout (int fd, char *buf, size_t len)
00193 {
00194 int n, ret=0;
00195
00196 io_flush();
00197
00198 while (ret == 0) {
00199
00200 fd_set fds;
00201 struct timeval tv;
00202 int fd_count = fd+1;
00203 int count;
00204
00205 FD_ZERO(&fds);
00206 FD_SET(fd, &fds);
00207 if (io_error_fd != -1) {
00208 FD_SET(io_error_fd, &fds);
00209 if (io_error_fd > fd) fd_count = io_error_fd+1;
00210 }
00211
00212 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
00213 tv.tv_usec = 0;
00214
00215 errno = 0;
00216
00217 count = select(fd_count, &fds, NULL, NULL, &tv);
00218
00219 if (count == 0) {
00220 check_timeout();
00221 }
00222
00223 if (count <= 0) {
00224 if (errno == EBADF) {
00225 exit_cleanup(RERR_SOCKETIO);
00226 }
00227 continue;
00228 }
00229
00230 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &fds)) {
00231 read_error_fd();
00232 }
00233
00234 if (!FD_ISSET(fd, &fds)) continue;
00235
00236 n = read(fd, buf, len);
00237
00238 if (n > 0) {
00239 buf += n;
00240 len -= n;
00241 ret += n;
00242 if (io_timeout)
00243 last_io = time(NULL);
00244 continue;
00245 } else if (n == 0) {
00246 whine_about_eof ();
00247 return -1;
00248 } else if (n == -1) {
00249 if (errno == EINTR || errno == EWOULDBLOCK ||
00250 errno == EAGAIN)
00251 continue;
00252 else
00253 die_from_readerr (errno);
00254 }
00255 }
00256
00257 return ret;
00258 }
00259
00260
00261
00262
00263
00264
00265
00266
00267 static void read_loop (int fd, char *buf, size_t len)
00268 {
00269 while (len) {
00270 int n = read_timeout(fd, buf, len);
00271
00272 buf += n;
00273 len -= n;
00274 }
00275 }
00276
00277
00278
00279
00280
00281
00282
00283
00284 static int read_unbuffered(int fd, char *buf, size_t len)
00285 {
00286 static size_t remaining;
00287 int tag, ret = 0;
00288 char line[1024];
00289
00290 if (!io_multiplexing_in || fd != multiplex_in_fd)
00291 return read_timeout(fd, buf, len);
00292
00293 while (ret == 0) {
00294 if (remaining) {
00295 len = MIN(len, remaining);
00296 read_loop(fd, buf, len);
00297 remaining -= len;
00298 ret = len;
00299 continue;
00300 }
00301
00302 read_loop(fd, line, 4);
00303 tag = IVAL(line, 0);
00304
00305 remaining = tag & 0xFFFFFF;
00306 tag = tag >> 24;
00307
00308 if (tag == MPLEX_BASE)
00309 continue;
00310
00311 tag -= MPLEX_BASE;
00312
00313 if (tag != FERROR && tag != FINFO) {
00314 rprintf(FERROR, "unexpected tag %d\n", tag);
00315 exit_cleanup(RERR_STREAMIO);
00316 }
00317
00318 if (remaining > sizeof(line) - 1) {
00319 rprintf(FERROR, "multiplexing overflow %d\n\n",
00320 remaining);
00321 exit_cleanup(RERR_STREAMIO);
00322 }
00323
00324 read_loop(fd, line, remaining);
00325 line[remaining] = 0;
00326
00327 rprintf((enum logcode) tag, "%s", line);
00328 remaining = 0;
00329 }
00330
00331 return ret;
00332 }
00333
00334
00335
00336
00337
00338
00339
00340
00341 static void readfd (int fd, char *buffer, size_t N)
00342 {
00343 int ret;
00344 size_t total=0;
00345
00346 while (total < N) {
00347 io_flush();
00348
00349 ret = read_unbuffered (fd, buffer + total, N-total);
00350 total += ret;
00351 }
00352
00353 stats.total_read += total;
00354 }
00355
00356
00357 int32 read_int(int f)
00358 {
00359 char b[4];
00360 int32 ret;
00361
00362 readfd(f,b,4);
00363 ret = IVAL(b,0);
00364 if (ret == (int32)0xffffffff) return -1;
00365 return ret;
00366 }
00367
00368 int64 read_longint(int f)
00369 {
00370 extern int remote_version;
00371 int64 ret;
00372 char b[8];
00373 ret = read_int(f);
00374
00375 if ((int32)ret != (int32)0xffffffff) {
00376 return ret;
00377 }
00378
00379 #ifdef NO_INT64
00380 rprintf(FERROR,"Integer overflow - attempted 64 bit offset\n");
00381 exit_cleanup(RERR_UNSUPPORTED);
00382 #else
00383 if (remote_version >= 16) {
00384 readfd(f,b,8);
00385 ret = IVAL(b,0) | (((int64)IVAL(b,4))<<32);
00386 }
00387 #endif
00388
00389 return ret;
00390 }
00391
00392 void read_buf(int f,char *buf,size_t len)
00393 {
00394 readfd(f,buf,len);
00395 }
00396
00397 void read_sbuf(int f,char *buf,size_t len)
00398 {
00399 read_buf (f,buf,len);
00400 buf[len] = 0;
00401 }
00402
00403 unsigned char read_byte(int f)
00404 {
00405 unsigned char c;
00406 read_buf (f, (char *)&c, 1);
00407 return c;
00408 }
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420 static void sleep_for_bwlimit(int bytes_written)
00421 {
00422 struct timeval tv;
00423
00424 if (!bwlimit)
00425 return;
00426
00427 assert(bytes_written > 0);
00428 assert(bwlimit > 0);
00429
00430 tv.tv_usec = bytes_written * 1000 / bwlimit;
00431 tv.tv_sec = tv.tv_usec / 1000000;
00432 tv.tv_usec = tv.tv_usec % 1000000;
00433
00434 select(0, NULL, NULL, NULL, &tv);
00435 }
00436
00437
00438
00439
00440
00441
00442
00443
00444 static void writefd_unbuffered(int fd,char *buf,size_t len)
00445 {
00446 size_t total = 0;
00447 fd_set w_fds, r_fds;
00448 int fd_count, count;
00449 struct timeval tv;
00450
00451 err_list_push();
00452
00453 no_flush++;
00454
00455 while (total < len) {
00456 FD_ZERO(&w_fds);
00457 FD_ZERO(&r_fds);
00458 FD_SET(fd,&w_fds);
00459 fd_count = fd;
00460
00461 if (io_error_fd != -1) {
00462 FD_SET(io_error_fd,&r_fds);
00463 if (io_error_fd > fd_count)
00464 fd_count = io_error_fd;
00465 }
00466
00467 tv.tv_sec = io_timeout?io_timeout:SELECT_TIMEOUT;
00468 tv.tv_usec = 0;
00469
00470 errno = 0;
00471
00472 count = select(fd_count+1,
00473 io_error_fd != -1?&r_fds:NULL,
00474 &w_fds,NULL,
00475 &tv);
00476
00477 if (count == 0) {
00478 check_timeout();
00479 }
00480
00481 if (count <= 0) {
00482 if (errno == EBADF) {
00483 exit_cleanup(RERR_SOCKETIO);
00484 }
00485 continue;
00486 }
00487
00488 if (io_error_fd != -1 && FD_ISSET(io_error_fd, &r_fds)) {
00489 read_error_fd();
00490 }
00491
00492 if (FD_ISSET(fd, &w_fds)) {
00493 int ret;
00494 size_t n = len-total;
00495 ret = write(fd,buf+total,n);
00496
00497 if (ret == -1 && errno == EINTR) {
00498 continue;
00499 }
00500
00501 if (ret == -1 &&
00502 (errno == EWOULDBLOCK || errno == EAGAIN)) {
00503 msleep(1);
00504 continue;
00505 }
00506
00507 if (ret <= 0) {
00508
00509
00510 io_multiplexing_close();
00511 rprintf(FERROR, RSYNC_NAME
00512 ": writefd_unbuffered failed to write %ld bytes: phase \"%s\": %s\n",
00513 (long) len, io_write_phase,
00514 strerror(errno));
00515 exit_cleanup(RERR_STREAMIO);
00516 }
00517
00518 sleep_for_bwlimit(ret);
00519
00520 total += ret;
00521
00522 if (io_timeout)
00523 last_io = time(NULL);
00524 }
00525 }
00526
00527 no_flush--;
00528 }
00529
00530
00531 static char *io_buffer;
00532 static int io_buffer_count;
00533
00534 void io_start_buffering(int fd)
00535 {
00536 if (io_buffer) return;
00537 multiplex_out_fd = fd;
00538 io_buffer = (char *)malloc(IO_BUFFER_SIZE);
00539 if (!io_buffer) out_of_memory("writefd");
00540 io_buffer_count = 0;
00541 }
00542
00543
00544
00545
00546
00547 static void mplex_write(int fd, enum logcode code, char *buf, size_t len)
00548 {
00549 char buffer[4096];
00550 size_t n = len;
00551
00552 SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
00553
00554 if (n > (sizeof(buffer)-4)) {
00555 n = sizeof(buffer)-4;
00556 }
00557
00558 memcpy(&buffer[4], buf, n);
00559 writefd_unbuffered(fd, buffer, n+4);
00560
00561 len -= n;
00562 buf += n;
00563
00564 if (len) {
00565 writefd_unbuffered(fd, buf, len);
00566 }
00567 }
00568
00569
00570 void io_flush(void)
00571 {
00572 int fd = multiplex_out_fd;
00573
00574 err_list_push();
00575
00576 if (!io_buffer_count || no_flush) return;
00577
00578 if (io_multiplexing_out) {
00579 mplex_write(fd, FNONE, io_buffer, io_buffer_count);
00580 } else {
00581 writefd_unbuffered(fd, io_buffer, io_buffer_count);
00582 }
00583 io_buffer_count = 0;
00584 }
00585
00586
00587 void io_end_buffering(void)
00588 {
00589 io_flush();
00590 if (!io_multiplexing_out) {
00591 free(io_buffer);
00592 io_buffer = NULL;
00593 }
00594 }
00595
00596 static void writefd(int fd,char *buf,size_t len)
00597 {
00598 stats.total_written += len;
00599
00600 err_list_push();
00601
00602 if (!io_buffer || fd != multiplex_out_fd) {
00603 writefd_unbuffered(fd, buf, len);
00604 return;
00605 }
00606
00607 while (len) {
00608 int n = MIN((int) len, IO_BUFFER_SIZE-io_buffer_count);
00609 if (n > 0) {
00610 memcpy(io_buffer+io_buffer_count, buf, n);
00611 buf += n;
00612 len -= n;
00613 io_buffer_count += n;
00614 }
00615
00616 if (io_buffer_count == IO_BUFFER_SIZE) io_flush();
00617 }
00618 }
00619
00620
00621 void write_int(int f,int32 x)
00622 {
00623 char b[4];
00624 SIVAL(b,0,x);
00625 writefd(f,b,4);
00626 }
00627
00628
00629 void write_int_named(int f, int32 x, const char *phase)
00630 {
00631 io_write_phase = phase;
00632 write_int(f, x);
00633 io_write_phase = phase_unknown;
00634 }
00635
00636
00637
00638
00639
00640
00641 void write_longint(int f, int64 x)
00642 {
00643 extern int remote_version;
00644 char b[8];
00645
00646 if (remote_version < 16 || x <= 0x7FFFFFFF) {
00647 write_int(f, (int)x);
00648 return;
00649 }
00650
00651 write_int(f, (int32)0xFFFFFFFF);
00652 SIVAL(b,0,(x&0xFFFFFFFF));
00653 SIVAL(b,4,((x>>32)&0xFFFFFFFF));
00654
00655 writefd(f,b,8);
00656 }
00657
00658 void write_buf(int f,char *buf,size_t len)
00659 {
00660 writefd(f,buf,len);
00661 }
00662
00663
00664 static void write_sbuf(int f,char *buf)
00665 {
00666 write_buf(f, buf, strlen(buf));
00667 }
00668
00669
00670 void write_byte(int f,unsigned char c)
00671 {
00672 write_buf(f,(char *)&c,1);
00673 }
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683 int read_line(int f, char *buf, size_t maxlen)
00684 {
00685 while (maxlen) {
00686 buf[0] = 0;
00687 read_buf(f, buf, 1);
00688 if (buf[0] == 0)
00689 return 0;
00690 if (buf[0] == '\n') {
00691 buf[0] = 0;
00692 break;
00693 }
00694 if (buf[0] != '\r') {
00695 buf++;
00696 maxlen--;
00697 }
00698 }
00699 if (maxlen == 0) {
00700 *buf = 0;
00701 return 0;
00702 }
00703
00704 return 1;
00705 }
00706
00707
00708 void io_printf(int fd, const char *format, ...)
00709 {
00710 va_list ap;
00711 char buf[1024];
00712 int len;
00713
00714 va_start(ap, format);
00715 len = vsnprintf(buf, sizeof(buf), format, ap);
00716 va_end(ap);
00717
00718 if (len < 0) exit_cleanup(RERR_STREAMIO);
00719
00720 write_sbuf(fd, buf);
00721 }
00722
00723
00724
00725 void io_start_multiplex_out(int fd)
00726 {
00727 multiplex_out_fd = fd;
00728 io_flush();
00729 io_start_buffering(fd);
00730 io_multiplexing_out = 1;
00731 }
00732
00733
00734 void io_start_multiplex_in(int fd)
00735 {
00736 multiplex_in_fd = fd;
00737 io_flush();
00738 io_multiplexing_in = 1;
00739 }
00740
00741
00742 int io_multiplex_write(enum logcode code, char *buf, size_t len)
00743 {
00744 if (!io_multiplexing_out) return 0;
00745
00746 io_flush();
00747 stats.total_written += (len+4);
00748 mplex_write(multiplex_out_fd, code, buf, len);
00749 return 1;
00750 }
00751
00752
00753 void io_multiplexing_close(void)
00754 {
00755 io_multiplexing_out = 0;
00756 }
00757