1 /* $OpenBSD: mta.c,v 1.113 2011/08/29 21:43:09 chl Exp $ */ 2 3 /* 4 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> 5 * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> 6 * Copyright (c) 2009 Jacek Masiulaniec <jacekm@dobremiasto.net> 7 * 8 * Permission to use, copy, modify, and distribute this software for any 9 * purpose with or without fee is hereby granted, provided that the above 10 * copyright notice and this permission notice appear in all copies. 11 * 12 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 13 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 14 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 15 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 16 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 17 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 18 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 19 */ 20 21 #include <sys/types.h> 22 #include <sys/queue.h> 23 #include <sys/tree.h> 24 #include <sys/param.h> 25 #include <sys/socket.h> 26 27 #include <errno.h> 28 #include <event.h> 29 #include <imsg.h> 30 #include <netdb.h> 31 #include <pwd.h> 32 #include <signal.h> 33 #include <stdio.h> 34 #include <stdlib.h> 35 #include <string.h> 36 #include <time.h> 37 #include <unistd.h> 38 39 #include "smtpd.h" 40 #include "client.h" 41 #include "log.h" 42 43 static void mta_imsg(struct imsgev *, struct imsg *); 44 static void mta_shutdown(void); 45 static void mta_sig_handler(int, short, void *); 46 static struct mta_session *mta_lookup(u_int64_t); 47 static void mta_enter_state(struct mta_session *, int, void *); 48 static void mta_pickup(struct mta_session *, void *); 49 static void mta_event(int, short, void *); 50 static void mta_status(struct mta_session *, const char *, ...); 51 static void mta_message_status(struct envelope *, char *); 52 static void mta_message_log(struct mta_session *, struct envelope *); 53 static void mta_message_done(struct mta_session *, struct envelope *); 54 static void mta_connect_done(int, short, void *); 55 static void mta_request_datafd(struct mta_session *); 56 57 static void 58 mta_imsg(struct imsgev *iev, struct imsg *imsg) 59 { 60 struct ramqueue_batch *rq_batch; 61 struct mta_session *s; 62 struct mta_relay *relay; 63 struct envelope *e; 64 struct secret *secret; 65 struct dns *dns; 66 struct ssl *ssl; 67 68 if (iev->proc == PROC_QUEUE) { 69 switch (imsg->hdr.type) { 70 case IMSG_BATCH_CREATE: 71 rq_batch = imsg->data; 72 73 s = calloc(1, sizeof *s); 74 if (s == NULL) 75 fatal(NULL); 76 s->id = rq_batch->b_id; 77 s->state = MTA_INIT; 78 s->batch = rq_batch; 79 80 /* establish host name */ 81 if (rq_batch->rule.r_action == A_RELAYVIA) { 82 s->host = strdup(rq_batch->rule.r_value.relayhost.hostname); 83 s->flags |= MTA_FORCE_MX; 84 } 85 else 86 s->host = NULL; 87 88 /* establish port */ 89 s->port = ntohs(rq_batch->rule.r_value.relayhost.port); /* XXX */ 90 91 /* have cert? */ 92 s->cert = strdup(rq_batch->rule.r_value.relayhost.cert); 93 if (s->cert == NULL) 94 fatal(NULL); 95 else if (s->cert[0] == '\0') { 96 free(s->cert); 97 s->cert = NULL; 98 } 99 100 /* use auth? */ 101 if ((rq_batch->rule.r_value.relayhost.flags & F_SSL) && 102 (rq_batch->rule.r_value.relayhost.flags & F_AUTH)) { 103 s->flags |= MTA_USE_AUTH; 104 s->secmapid = rq_batch->rule.r_value.relayhost.secmapid; 105 } 106 107 /* force a particular SSL mode? */ 108 switch (rq_batch->rule.r_value.relayhost.flags & F_SSL) { 109 case F_SSL: 110 s->flags |= MTA_FORCE_ANYSSL; 111 break; 112 113 case F_SMTPS: 114 s->flags |= MTA_FORCE_SMTPS; 115 116 case F_STARTTLS: 117 /* client_* API by default requires STARTTLS */ 118 break; 119 120 default: 121 s->flags |= MTA_ALLOW_PLAIN; 122 } 123 124 TAILQ_INIT(&s->recipients); 125 TAILQ_INIT(&s->relays); 126 SPLAY_INSERT(mtatree, &env->mta_sessions, s); 127 return; 128 129 130 case IMSG_BATCH_APPEND: 131 e = imsg->data; 132 s = mta_lookup(e->batch_id); 133 e = malloc(sizeof *e); 134 if (e == NULL) 135 fatal(NULL); 136 *e = *(struct envelope *)imsg->data; 137 strlcpy(e->delivery.errorline, "000 init", 138 sizeof(e->delivery.errorline)); 139 140 if (s->host == NULL) { 141 s->host = strdup(e->delivery.rcpt.domain); 142 if (s->host == NULL) 143 fatal("strdup"); 144 } 145 TAILQ_INSERT_TAIL(&s->recipients, e, entry); 146 return; 147 148 case IMSG_BATCH_CLOSE: 149 rq_batch = imsg->data; 150 mta_pickup(mta_lookup(rq_batch->b_id), NULL); 151 return; 152 153 case IMSG_QUEUE_MESSAGE_FD: 154 rq_batch = imsg->data; 155 mta_pickup(mta_lookup(rq_batch->b_id), &imsg->fd); 156 return; 157 } 158 } 159 160 if (iev->proc == PROC_LKA) { 161 switch (imsg->hdr.type) { 162 case IMSG_LKA_SECRET: 163 secret = imsg->data; 164 mta_pickup(mta_lookup(secret->id), secret->secret); 165 return; 166 167 case IMSG_DNS_HOST: 168 dns = imsg->data; 169 s = mta_lookup(dns->id); 170 relay = calloc(1, sizeof *relay); 171 if (relay == NULL) 172 fatal(NULL); 173 relay->sa = dns->ss; 174 TAILQ_INSERT_TAIL(&s->relays, relay, entry); 175 return; 176 177 case IMSG_DNS_HOST_END: 178 dns = imsg->data; 179 mta_pickup(mta_lookup(dns->id), &dns->error); 180 return; 181 182 case IMSG_DNS_PTR: 183 dns = imsg->data; 184 s = mta_lookup(dns->id); 185 relay = TAILQ_FIRST(&s->relays); 186 if (dns->error) 187 strlcpy(relay->fqdn, "<unknown>", 188 sizeof relay->fqdn); 189 else 190 strlcpy(relay->fqdn, dns->host, 191 sizeof relay->fqdn); 192 mta_pickup(s, NULL); 193 return; 194 } 195 } 196 197 if (iev->proc == PROC_PARENT) { 198 switch (imsg->hdr.type) { 199 case IMSG_CONF_START: 200 if (env->sc_flags & SMTPD_CONFIGURING) 201 return; 202 env->sc_flags |= SMTPD_CONFIGURING; 203 env->sc_ssl = calloc(1, sizeof *env->sc_ssl); 204 if (env->sc_ssl == NULL) 205 fatal(NULL); 206 return; 207 208 case IMSG_CONF_SSL: 209 if (!(env->sc_flags & SMTPD_CONFIGURING)) 210 return; 211 ssl = calloc(1, sizeof *ssl); 212 if (ssl == NULL) 213 fatal(NULL); 214 *ssl = *(struct ssl *)imsg->data; 215 ssl->ssl_cert = strdup((char *)imsg->data + 216 sizeof *ssl); 217 if (ssl->ssl_cert == NULL) 218 fatal(NULL); 219 ssl->ssl_key = strdup((char *)imsg->data + 220 sizeof *ssl + ssl->ssl_cert_len); 221 if (ssl->ssl_key == NULL) 222 fatal(NULL); 223 SPLAY_INSERT(ssltree, env->sc_ssl, ssl); 224 return; 225 226 case IMSG_CONF_END: 227 if (!(env->sc_flags & SMTPD_CONFIGURING)) 228 return; 229 env->sc_flags &= ~SMTPD_CONFIGURING; 230 return; 231 232 case IMSG_CTL_VERBOSE: 233 log_verbose(*(int *)imsg->data); 234 return; 235 } 236 } 237 238 fatalx("mta_imsg: unexpected imsg"); 239 } 240 241 static void 242 mta_sig_handler(int sig, short event, void *p) 243 { 244 switch (sig) { 245 case SIGINT: 246 case SIGTERM: 247 mta_shutdown(); 248 break; 249 default: 250 fatalx("mta_sig_handler: unexpected signal"); 251 } 252 } 253 254 static void 255 mta_shutdown(void) 256 { 257 log_info("mail transfer agent exiting"); 258 _exit(0); 259 } 260 261 pid_t 262 mta(void) 263 { 264 pid_t pid; 265 266 struct passwd *pw; 267 struct event ev_sigint; 268 struct event ev_sigterm; 269 270 struct peer peers[] = { 271 { PROC_PARENT, imsg_dispatch }, 272 { PROC_QUEUE, imsg_dispatch }, 273 { PROC_LKA, imsg_dispatch } 274 }; 275 276 switch (pid = fork()) { 277 case -1: 278 fatal("mta: cannot fork"); 279 case 0: 280 break; 281 default: 282 return (pid); 283 } 284 285 ssl_init(); 286 purge_config(PURGE_EVERYTHING); 287 288 pw = env->sc_pw; 289 if (chroot(pw->pw_dir) == -1) 290 fatal("mta: chroot"); 291 if (chdir("/") == -1) 292 fatal("mta: chdir(\"/\")"); 293 294 smtpd_process = PROC_MTA; 295 setproctitle("%s", env->sc_title[smtpd_process]); 296 297 if (setgroups(1, &pw->pw_gid) || 298 setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || 299 setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) 300 fatal("mta: cannot drop privileges"); 301 302 imsg_callback = mta_imsg; 303 event_init(); 304 305 signal_set(&ev_sigint, SIGINT, mta_sig_handler, NULL); 306 signal_set(&ev_sigterm, SIGTERM, mta_sig_handler, NULL); 307 signal_add(&ev_sigint, NULL); 308 signal_add(&ev_sigterm, NULL); 309 signal(SIGPIPE, SIG_IGN); 310 signal(SIGHUP, SIG_IGN); 311 312 config_pipes(peers, nitems(peers)); 313 config_peers(peers, nitems(peers)); 314 315 ramqueue_init(&env->sc_rqueue); 316 SPLAY_INIT(&env->mta_sessions); 317 318 if (event_dispatch() < 0) 319 fatal("event_dispatch"); 320 mta_shutdown(); 321 322 return (0); 323 } 324 325 static struct mta_session * 326 mta_lookup(u_int64_t id) 327 { 328 struct mta_session key, *res; 329 330 key.id = id; 331 if ((res = SPLAY_FIND(mtatree, &env->mta_sessions, &key)) == NULL) 332 fatalx("mta_lookup: session not found"); 333 return (res); 334 } 335 336 static void 337 mta_enter_state(struct mta_session *s, int newstate, void *p) 338 { 339 struct secret secret; 340 struct mta_relay *relay; 341 struct sockaddr *sa; 342 struct envelope *e; 343 struct smtp_client *pcb; 344 int max_reuse; 345 346 s->state = newstate; 347 348 switch (s->state) { 349 case MTA_SECRET: 350 /* 351 * Lookup AUTH secret. 352 */ 353 bzero(&secret, sizeof(secret)); 354 secret.id = s->id; 355 secret.secmapid = s->secmapid; 356 strlcpy(secret.host, s->host, sizeof(secret.host)); 357 imsg_compose_event(env->sc_ievs[PROC_LKA], IMSG_LKA_SECRET, 358 0, 0, -1, &secret, sizeof(secret)); 359 break; 360 361 case MTA_MX: 362 /* 363 * Lookup MX record. 364 */ 365 if (s->flags & MTA_FORCE_MX) 366 dns_query_host(s->host, s->port, s->id); 367 else 368 dns_query_mx(s->host, 0, s->id); 369 break; 370 371 case MTA_DATA: 372 /* 373 * Obtain message body fd. 374 */ 375 log_debug("mta: getting datafd"); 376 mta_request_datafd(s); 377 break; 378 379 case MTA_CONNECT: 380 /* 381 * Connect to the MX. 382 */ 383 if (s->flags & MTA_FORCE_ANYSSL) 384 max_reuse = 2; 385 else 386 max_reuse = 1; 387 388 /* pick next mx */ 389 while ((relay = TAILQ_FIRST(&s->relays))) { 390 if (relay->used == max_reuse) { 391 TAILQ_REMOVE(&s->relays, relay, entry); 392 free(relay); 393 continue; 394 } 395 relay->used++; 396 397 log_debug("mta: connect %s", ss_to_text(&relay->sa)); 398 sa = (struct sockaddr *)&relay->sa; 399 400 if (s->port) 401 sa_set_port(sa, s->port); 402 else if ((s->flags & MTA_FORCE_ANYSSL) && relay->used == 1) 403 sa_set_port(sa, 465); 404 else if (s->flags & MTA_FORCE_SMTPS) 405 sa_set_port(sa, 465); 406 else 407 sa_set_port(sa, 25); 408 409 s->fd = socket(sa->sa_family, SOCK_STREAM, 0); 410 if (s->fd == -1) 411 fatal("mta cannot create socket"); 412 session_socket_blockmode(s->fd, BM_NONBLOCK); 413 session_socket_no_linger(s->fd); 414 415 if (connect(s->fd, sa, sa->sa_len) == -1) { 416 if (errno != EINPROGRESS) { 417 mta_status(s, "110 connect error: %s", 418 strerror(errno)); 419 close(s->fd); 420 continue; 421 } 422 } 423 event_once(s->fd, EV_WRITE, mta_connect_done, s, NULL); 424 break; 425 } 426 427 /* tried them all? */ 428 if (relay == NULL) 429 mta_enter_state(s, MTA_DONE, NULL); 430 break; 431 432 case MTA_PTR: 433 /* 434 * Lookup PTR record of the connected host. 435 */ 436 relay = TAILQ_FIRST(&s->relays); 437 dns_query_ptr(&relay->sa, s->id); 438 break; 439 440 case MTA_PROTOCOL: 441 /* 442 * Start protocol engine. 443 */ 444 log_debug("mta: entering smtp phase"); 445 446 pcb = client_init(s->fd, s->datafp, env->sc_hostname, 1); 447 448 /* lookup SSL certificate */ 449 if (s->cert) { 450 struct ssl key, *res; 451 452 strlcpy(key.ssl_name, s->cert, sizeof(key.ssl_name)); 453 res = SPLAY_FIND(ssltree, env->sc_ssl, &key); 454 if (res == NULL) { 455 client_close(pcb); 456 s->pcb = NULL; 457 mta_status(s, "190 certificate not found"); 458 mta_enter_state(s, MTA_DONE, NULL); 459 break; 460 } 461 client_certificate(pcb, 462 res->ssl_cert, res->ssl_cert_len, 463 res->ssl_key, res->ssl_key_len); 464 } 465 466 /* choose SMTPS vs. STARTTLS */ 467 relay = TAILQ_FIRST(&s->relays); 468 if ((s->flags & MTA_FORCE_ANYSSL) && relay->used == 1) 469 client_ssl_smtps(pcb); 470 else if (s->flags & MTA_FORCE_SMTPS) 471 client_ssl_smtps(pcb); 472 else if (s->flags & MTA_ALLOW_PLAIN) 473 client_ssl_optional(pcb); 474 475 /* enable AUTH */ 476 if (s->secret) 477 client_auth(pcb, s->secret); 478 479 /* set envelope sender */ 480 e = TAILQ_FIRST(&s->recipients); 481 if (e->delivery.from.user[0] && e->delivery.from.domain[0]) 482 client_sender(pcb, "%s@%s", e->delivery.from.user, 483 e->delivery.from.domain); 484 else 485 client_sender(pcb, ""); 486 487 /* set envelope recipients */ 488 TAILQ_FOREACH(e, &s->recipients, entry) 489 client_rcpt(pcb, e, "%s@%s", e->delivery.rcpt.user, 490 e->delivery.rcpt.domain); 491 492 s->pcb = pcb; 493 event_set(&s->ev, s->fd, EV_READ|EV_WRITE, mta_event, s); 494 event_add(&s->ev, &pcb->timeout); 495 break; 496 497 case MTA_DONE: 498 /* 499 * Kill mta session. 500 */ 501 502 /* update queue status */ 503 while ((e = TAILQ_FIRST(&s->recipients))) 504 mta_message_done(s, e); 505 506 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 507 IMSG_BATCH_DONE, 0, 0, -1, NULL, 0); 508 509 /* deallocate resources */ 510 SPLAY_REMOVE(mtatree, &env->mta_sessions, s); 511 while ((relay = TAILQ_FIRST(&s->relays))) { 512 TAILQ_REMOVE(&s->relays, relay, entry); 513 free(relay); 514 } 515 516 if (s->datafp) 517 fclose(s->datafp); 518 519 free(s->secret); 520 free(s->host); 521 free(s->cert); 522 free(s); 523 break; 524 525 default: 526 fatal("mta_enter_state: unknown state"); 527 } 528 } 529 530 static void 531 mta_pickup(struct mta_session *s, void *p) 532 { 533 int error; 534 535 switch (s->state) { 536 case MTA_INIT: 537 if (s->flags & MTA_USE_AUTH) 538 mta_enter_state(s, MTA_SECRET, NULL); 539 else 540 mta_enter_state(s, MTA_MX, NULL); 541 break; 542 543 case MTA_SECRET: 544 /* LKA responded to AUTH lookup. */ 545 s->secret = strdup(p); 546 if (s->secret == NULL) 547 fatal(NULL); 548 else if (s->secret[0] == '\0') { 549 mta_status(s, "190 secrets lookup failed"); 550 mta_enter_state(s, MTA_DONE, NULL); 551 } else 552 mta_enter_state(s, MTA_MX, NULL); 553 break; 554 555 case MTA_MX: 556 /* LKA responded to DNS lookup. */ 557 if ((error = *(int *)p)) { 558 if (error == DNS_RETRY) 559 mta_status(s, "100 MX lookup failed temporarily"); 560 else if (error == DNS_EINVAL) 561 mta_status(s, "600 Invalid domain name"); 562 else if (error == DNS_ENONAME) 563 mta_status(s, "600 Domain does not exist"); 564 else if (error == DNS_ENOTFOUND) 565 mta_status(s, "600 No MX address found for domain"); 566 mta_enter_state(s, MTA_DONE, NULL); 567 } else 568 mta_enter_state(s, MTA_DATA, NULL); 569 break; 570 571 case MTA_DATA: 572 /* QUEUE replied to body fd request. */ 573 if (*(int *)p == -1) 574 fatalx("mta cannot obtain msgfd"); 575 s->datafp = fdopen(*(int *)p, "r"); 576 if (s->datafp == NULL) 577 fatal("fdopen"); 578 mta_enter_state(s, MTA_CONNECT, NULL); 579 break; 580 581 case MTA_CONNECT: 582 /* Remote accepted/rejected connection. */ 583 error = session_socket_error(s->fd); 584 if (error) { 585 mta_status(s, "110 connect error: %s", strerror(error)); 586 close(s->fd); 587 mta_enter_state(s, MTA_CONNECT, NULL); 588 } else 589 mta_enter_state(s, MTA_PTR, NULL); 590 break; 591 592 case MTA_PTR: 593 mta_enter_state(s, MTA_PROTOCOL, NULL); 594 break; 595 596 default: 597 fatalx("mta_pickup: unexpected state"); 598 } 599 } 600 601 static void 602 mta_event(int fd, short event, void *p) 603 { 604 struct mta_session *s = p; 605 struct smtp_client *pcb = s->pcb; 606 607 if (event & EV_TIMEOUT) { 608 mta_status(s, "150 timeout"); 609 goto out; 610 } 611 612 switch (client_talk(pcb, event & EV_WRITE)) { 613 case CLIENT_WANT_WRITE: 614 goto rw; 615 case CLIENT_STOP_WRITE: 616 goto ro; 617 case CLIENT_RCPT_FAIL: 618 mta_message_status(pcb->rcptfail, pcb->reply); 619 mta_message_log(s, pcb->rcptfail); 620 mta_message_done(s, pcb->rcptfail); 621 goto rw; 622 case CLIENT_DONE: 623 mta_status(s, "%s", pcb->status); 624 break; 625 default: 626 fatalx("mta_event: unexpected code"); 627 } 628 629 out: 630 client_close(pcb); 631 s->pcb = NULL; 632 633 if (TAILQ_EMPTY(&s->recipients)) 634 mta_enter_state(s, MTA_DONE, NULL); 635 else 636 mta_enter_state(s, MTA_CONNECT, NULL); 637 return; 638 639 rw: 640 event_set(&s->ev, fd, EV_READ|EV_WRITE, mta_event, s); 641 event_add(&s->ev, &pcb->timeout); 642 return; 643 644 ro: 645 event_set(&s->ev, fd, EV_READ, mta_event, s); 646 event_add(&s->ev, &pcb->timeout); 647 } 648 649 static void 650 mta_status(struct mta_session *s, const char *fmt, ...) 651 { 652 char *status; 653 struct envelope *e, *next; 654 va_list ap; 655 656 va_start(ap, fmt); 657 if (vasprintf(&status, fmt, ap) == -1) 658 fatal("vasprintf"); 659 va_end(ap); 660 661 for (e = TAILQ_FIRST(&s->recipients); e; e = next) { 662 next = TAILQ_NEXT(e, entry); 663 664 /* save new status */ 665 mta_message_status(e, status); 666 667 /* remove queue entry */ 668 if (*status == '2' || *status == '5' || *status == '6') { 669 mta_message_log(s, e); 670 mta_message_done(s, e); 671 } 672 } 673 674 free(status); 675 } 676 677 static void 678 mta_message_status(struct envelope *e, char *status) 679 { 680 /* 681 * Previous delivery attempts might have assigned an errorline of 682 * higher status (eg. 5yz is of higher status than 4yz), so check 683 * this before deciding to overwrite existing status with a new one. 684 */ 685 if (*status != '2' && strncmp(e->delivery.errorline, status, 3) > 0) 686 return; 687 688 /* change status */ 689 log_debug("mta: new status for %s@%s: %s", e->delivery.rcpt.user, 690 e->delivery.rcpt.domain, status); 691 strlcpy(e->delivery.errorline, status, sizeof(e->delivery.errorline)); 692 } 693 694 static void 695 mta_message_log(struct mta_session *s, struct envelope *e) 696 { 697 struct mta_relay *relay = TAILQ_FIRST(&s->relays); 698 char *status = e->delivery.errorline; 699 700 log_info("%016llx: to=<%s@%s>, delay=%lld, relay=%s [%s], stat=%s (%s)", 701 e->delivery.id, e->delivery.rcpt.user, 702 e->delivery.rcpt.domain, 703 (long long int) (time(NULL) - e->delivery.creation), 704 relay ? relay->fqdn : "(none)", 705 relay ? ss_to_text(&relay->sa) : "", 706 *status == '2' ? "Sent" : 707 *status == '5' ? "RemoteError" : 708 *status == '4' ? "RemoteError" : "LocalError", 709 status + 4); 710 } 711 712 static void 713 mta_message_done(struct mta_session *s, struct envelope *e) 714 { 715 switch (e->delivery.errorline[0]) { 716 case '6': 717 case '5': 718 e->delivery.status = DS_PERMFAILURE; 719 break; 720 case '2': 721 e->delivery.status = DS_ACCEPTED; 722 break; 723 default: 724 e->delivery.status = DS_TEMPFAILURE; 725 break; 726 } 727 imsg_compose_event(env->sc_ievs[PROC_QUEUE], 728 IMSG_QUEUE_MESSAGE_UPDATE, 0, 0, -1, e, sizeof(*e)); 729 TAILQ_REMOVE(&s->recipients, e, entry); 730 free(e); 731 } 732 733 static void 734 mta_connect_done(int fd, short event, void *p) 735 { 736 mta_pickup(p, NULL); 737 } 738 739 static void 740 mta_request_datafd(struct mta_session *s) 741 { 742 struct ramqueue_batch rq_batch; 743 struct envelope *e; 744 745 e = TAILQ_FIRST(&s->recipients); 746 747 rq_batch.b_id = s->id; 748 rq_batch.msgid = evpid_to_msgid(e->delivery.id); 749 imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_MESSAGE_FD, 750 0, 0, -1, &rq_batch, sizeof(rq_batch)); 751 } 752 753 int 754 mta_session_cmp(struct mta_session *a, struct mta_session *b) 755 { 756 return (a->id < b->id ? -1 : a->id > b->id); 757 } 758 759 SPLAY_GENERATE(mtatree, mta_session, entry, mta_session_cmp); 760