diff --git a/daemon/main.go b/daemon/main.go index 2ea17cef..15b721dc 100644 --- a/daemon/main.go +++ b/daemon/main.go @@ -33,7 +33,7 @@ var ( err = (error)(nil) rules = (*rule.Loader)(nil) stats = (*statistics.Statistics)(nil) - queue = (*netfilter.NFQueue)(nil) + queue = (*netfilter.Queue)(nil) pktChan = (<-chan netfilter.NFPacket)(nil) wrkChan = (chan netfilter.NFPacket)(nil) sigChan = (chan os.Signal)(nil) @@ -208,11 +208,11 @@ func main() { // prepare the queue setupWorkers() - queue, err := netfilter.NewNFQueue(uint16(queueNum), 4096, netfilter.NF_DEFAULT_PACKET_SIZE) + queue, err := netfilter.NewQueue(uint16(queueNum), 4096, netfilter.NF_DEFAULT_PACKET_SIZE) if err != nil { log.Fatal("Error while creating queue #%d: %s", queueNum, err) } - pktChan = queue.GetPackets() + pktChan = queue.Packets() // queue is ready, run firewall rules if err = firewall.QueueDNSResponses(true, queueNum); err != nil { diff --git a/daemon/netfilter/netfilter.c b/daemon/netfilter/netfilter.c deleted file mode 100644 index 0a9ead14..00000000 --- a/daemon/netfilter/netfilter.c +++ /dev/null @@ -1,2 +0,0 @@ -#include "netfilter.h" - diff --git a/daemon/netfilter/netfilter.go b/daemon/netfilter/netfilter.go deleted file mode 100644 index fb253bf4..00000000 --- a/daemon/netfilter/netfilter.go +++ /dev/null @@ -1,226 +0,0 @@ -package netfilter - -/* -#cgo pkg-config: libnetfilter_queue -#cgo CFLAGS: -Wall -I/usr/include -#cgo LDFLAGS: -L/usr/lib64/ - -#include "netfilter.h" -*/ -import "C" - -import ( - "fmt" - "os" - "sync" - "time" - "unsafe" - - "github.com/google/gopacket" - "github.com/google/gopacket/layers" -) - -//Verdict for a packet -type Verdict C.uint - -//Container for a verdict and (possibly) a modified packet (C side) -type VerdictContainerC C.verdictContainer - -//Container for a verdict and (possibly) a modified packet (Go side) -type VerdictContainer struct { - Verdict Verdict - Mark uint32 - Packet []byte -} - -type NFPacket struct { - Packet gopacket.Packet - Mark uint32 - verdictChannel chan VerdictContainer -} - -//Set the verdict for the packet -func (p *NFPacket) SetVerdict(v Verdict) { - p.verdictChannel <- VerdictContainer{Verdict: v, Packet: nil, Mark: 0} -} - -func (p *NFPacket) SetVerdictAndMark(v Verdict, mark uint32) { - p.verdictChannel <- VerdictContainer{Verdict: v, Packet: nil, Mark: mark} -} - -//Set the verdict for the packet (in the case of requeue) -func (p *NFPacket) SetRequeueVerdict(newQueueId uint16) { - v := uint(NF_QUEUE) - q := (uint(newQueueId) << 16) - v = v | q - p.verdictChannel <- VerdictContainer{Verdict: Verdict(v), Packet: nil, Mark: 0} -} - -//Set the verdict for the packet AND provide new packet content for injection -func (p *NFPacket) SetVerdictWithPacket(v Verdict, packet []byte) { - p.verdictChannel <- VerdictContainer{Verdict: v, Packet: packet, Mark: 0} -} - -type NFQueue struct { - h *C.struct_nfq_handle - qh *C.struct_nfq_q_handle - fd C.int - packets chan NFPacket - idx uint32 -} - -const ( - AF_INET = 2 - AF_INET6 = 10 - - NF_DROP Verdict = 0 - NF_ACCEPT Verdict = 1 - NF_STOLEN Verdict = 2 - NF_QUEUE Verdict = 3 - NF_REPEAT Verdict = 4 - NF_STOP Verdict = 5 - - NF_DEFAULT_PACKET_SIZE uint32 = 0xffff - - ipv4version = 0x40 -) - -var theTable = make(map[uint32]*chan NFPacket, 0) -var theTabeLock sync.RWMutex - -//Create and bind to queue specified by queueId -func NewNFQueue(queueId uint16, maxPacketsInQueue uint32, packetSize uint32) (*NFQueue, error) { - var nfq = NFQueue{} - var err error - var ret C.int - - if nfq.h, err = C.nfq_open(); err != nil { - return nil, fmt.Errorf("Error opening NFQueue handle: %v\n", err) - } - - if ret, err = C.nfq_unbind_pf(nfq.h, AF_INET); err != nil || ret < 0 { - return nil, fmt.Errorf("Error unbinding existing NFQ handler from AF_INET protocol family: %v\n", err) - } - - if ret, err = C.nfq_unbind_pf(nfq.h, AF_INET6); err != nil || ret < 0 { - return nil, fmt.Errorf("Error unbinding existing NFQ handler from AF_INET6 protocol family: %v\n", err) - } - - if ret, err := C.nfq_bind_pf(nfq.h, AF_INET); err != nil || ret < 0 { - return nil, fmt.Errorf("Error binding to AF_INET protocol family: %v\n", err) - } - - if ret, err := C.nfq_bind_pf(nfq.h, AF_INET6); err != nil || ret < 0 { - return nil, fmt.Errorf("Error binding to AF_INET6 protocol family: %v\n", err) - } - - nfq.packets = make(chan NFPacket) - nfq.idx = uint32(time.Now().UnixNano()) - theTabeLock.Lock() - theTable[nfq.idx] = &nfq.packets - theTabeLock.Unlock() - if nfq.qh, err = C.CreateQueue(nfq.h, C.u_int16_t(queueId), C.u_int32_t(nfq.idx)); err != nil || nfq.qh == nil { - C.nfq_close(nfq.h) - return nil, fmt.Errorf("Error binding to queue: %v\n", err) - } - - if ret, err = C.nfq_set_queue_maxlen(nfq.qh, C.u_int32_t(maxPacketsInQueue)); err != nil || ret < 0 { - C.nfq_destroy_queue(nfq.qh) - C.nfq_close(nfq.h) - return nil, fmt.Errorf("Unable to set max packets in queue: %v\n", err) - } - - if C.nfq_set_mode(nfq.qh, C.u_int8_t(2), C.uint(packetSize)) < 0 { - C.nfq_destroy_queue(nfq.qh) - C.nfq_close(nfq.h) - return nil, fmt.Errorf("Unable to set packets copy mode: %v\n", err) - } - - if nfq.fd, err = C.nfq_fd(nfq.h); err != nil { - C.nfq_destroy_queue(nfq.qh) - C.nfq_close(nfq.h) - return nil, fmt.Errorf("Unable to get queue file-descriptor. %v\n", err) - } - - go nfq.run() - - return &nfq, nil -} - -//Unbind and close the queue -func (nfq *NFQueue) Close() { - C.nfq_destroy_queue(nfq.qh) - C.nfq_close(nfq.h) - theTabeLock.Lock() - delete(theTable, nfq.idx) - theTabeLock.Unlock() -} - -//Get the channel for packets -func (nfq *NFQueue) GetPackets() <-chan NFPacket { - return nfq.packets -} - -func (nfq *NFQueue) run() { - if errno := C.Run(nfq.h, nfq.fd); errno != 0 { - fmt.Fprintf(os.Stderr, "Terminating, unable to receive packet due to errno=%d\n", errno) - } -} - -//export go_callback -func go_callback(queueId C.int, data *C.uchar, length C.int, mark C.uint, idx uint32, vc *VerdictContainerC) { - xdata := C.GoBytes(unsafe.Pointer(data), length) - - var packet gopacket.Packet - if xdata[0]&0xf0 == ipv4version { - packet = gopacket.NewPacket(xdata, layers.LayerTypeIPv4, gopacket.DecodeOptions{Lazy: true, NoCopy: true}) - } else { - packet = gopacket.NewPacket(xdata, layers.LayerTypeIPv6, gopacket.DecodeOptions{Lazy: true, NoCopy: true}) - } - - p := NFPacket{ - verdictChannel: make(chan VerdictContainer), - Mark: uint32(mark), - Packet: packet, - } - - theTabeLock.RLock() - cb, ok := theTable[idx] - theTabeLock.RUnlock() - if !ok { - fmt.Fprintf(os.Stderr, "Dropping, unexpectedly due to bad idx=%d\n", idx) - (*vc).verdict = C.uint(NF_DROP) - (*vc).data = nil - (*vc).mark_set = 0 - (*vc).length = 0 - } - select { - case *cb <- p: - select { - case v := <-p.verdictChannel: - if v.Packet == nil { - (*vc).verdict = C.uint(v.Verdict) - (*vc).data = nil - (*vc).length = 0 - } else { - (*vc).verdict = C.uint(v.Verdict) - (*vc).data = (*C.uchar)(unsafe.Pointer(&v.Packet[0])) - (*vc).length = C.uint(len(v.Packet)) - } - - if v.Mark != 0 { - (*vc).mark_set = C.uint(1) - (*vc).mark = C.uint(v.Mark) - } else { - (*vc).mark_set = C.uint(0) - } - } - - default: - fmt.Fprintf(os.Stderr, "Ignoring unexpectedly due to no recv, idx=%d\n", idx) - (*vc).verdict = C.uint(NF_ACCEPT) - (*vc).data = nil - (*vc).mark_set = 0 - (*vc).length = 0 - } -} diff --git a/daemon/netfilter/packet.go b/daemon/netfilter/packet.go new file mode 100644 index 00000000..542c4ba9 --- /dev/null +++ b/daemon/netfilter/packet.go @@ -0,0 +1,40 @@ +package netfilter + +import "C" + +import ( + "github.com/google/gopacket" +) + +type Verdict C.uint + +type VerdictContainer struct { + Verdict Verdict + Mark uint32 + Packet []byte +} + +type NFPacket struct { + Packet gopacket.Packet + Mark uint32 + verdictChannel chan VerdictContainer +} + +func (p *NFPacket) SetVerdict(v Verdict) { + p.verdictChannel <- VerdictContainer{Verdict: v, Packet: nil, Mark: 0} +} + +func (p *NFPacket) SetVerdictAndMark(v Verdict, mark uint32) { + p.verdictChannel <- VerdictContainer{Verdict: v, Packet: nil, Mark: mark} +} + +func (p *NFPacket) SetRequeueVerdict(newQueueId uint16) { + v := uint(NF_QUEUE) + q := (uint(newQueueId) << 16) + v = v | q + p.verdictChannel <- VerdictContainer{Verdict: Verdict(v), Packet: nil, Mark: 0} +} + +func (p *NFPacket) SetVerdictWithPacket(v Verdict, packet []byte) { + p.verdictChannel <- VerdictContainer{Verdict: v, Packet: packet, Mark: 0} +} diff --git a/daemon/netfilter/queue.c b/daemon/netfilter/queue.c new file mode 100644 index 00000000..f2b7ef64 --- /dev/null +++ b/daemon/netfilter/queue.c @@ -0,0 +1,2 @@ +#include "queue.h" + diff --git a/daemon/netfilter/queue.go b/daemon/netfilter/queue.go new file mode 100644 index 00000000..4a0334e6 --- /dev/null +++ b/daemon/netfilter/queue.go @@ -0,0 +1,174 @@ +package netfilter + +/* +#cgo pkg-config: libnetfilter_queue +#cgo CFLAGS: -Wall -I/usr/include +#cgo LDFLAGS: -L/usr/lib64/ + +#include "queue.h" +*/ +import "C" + +import ( + "fmt" + "os" + "sync" + "time" + "unsafe" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +const ( + AF_INET = 2 + AF_INET6 = 10 + + NF_DROP Verdict = 0 + NF_ACCEPT Verdict = 1 + NF_STOLEN Verdict = 2 + NF_QUEUE Verdict = 3 + NF_REPEAT Verdict = 4 + NF_STOP Verdict = 5 + + NF_DEFAULT_PACKET_SIZE uint32 = 0xffff + + ipv4version = 0x40 +) + +var ( + queueIndex = make(map[uint32]*chan NFPacket, 0) + queueIndexLock = sync.RWMutex{} + + gopacketDecodeOptions = gopacket.DecodeOptions{Lazy: true, NoCopy: true} +) + +type VerdictContainerC C.verdictContainer + +type Queue struct { + h *C.struct_nfq_handle + qh *C.struct_nfq_q_handle + fd C.int + packets chan NFPacket + idx uint32 +} + +//Create and bind to queue specified by queueId +func NewQueue(queueId uint16, maxPacketsInQueue uint32, packetSize uint32) (*Queue, error) { + var q = Queue{ + idx: uint32(time.Now().UnixNano()), + packets: make(chan NFPacket), + } + var err error + var ret C.int + + if q.h, err = C.nfq_open(); err != nil { + return nil, fmt.Errorf("Error opening Queue handle: %v\n", err) + } else if ret, err = C.nfq_unbind_pf(q.h, AF_INET); err != nil || ret < 0 { + return nil, fmt.Errorf("Error unbinding existing q handler from AF_INET protocol family: %v\n", err) + } else if ret, err = C.nfq_unbind_pf(q.h, AF_INET6); err != nil || ret < 0 { + return nil, fmt.Errorf("Error unbinding existing q handler from AF_INET6 protocol family: %v\n", err) + } else if ret, err := C.nfq_bind_pf(q.h, AF_INET); err != nil || ret < 0 { + return nil, fmt.Errorf("Error binding to AF_INET protocol family: %v\n", err) + } else if ret, err := C.nfq_bind_pf(q.h, AF_INET6); err != nil || ret < 0 { + return nil, fmt.Errorf("Error binding to AF_INET6 protocol family: %v\n", err) + } + + queueIndexLock.Lock() + queueIndex[q.idx] = &q.packets + queueIndexLock.Unlock() + + if q.qh, err = C.CreateQueue(q.h, C.u_int16_t(queueId), C.u_int32_t(q.idx)); err != nil || q.qh == nil { + C.nfq_close(q.h) + return nil, fmt.Errorf("Error binding to queue: %v\n", err) + } else if ret, err = C.nfq_set_queue_maxlen(q.qh, C.u_int32_t(maxPacketsInQueue)); err != nil || ret < 0 { + C.nfq_destroy_queue(q.qh) + C.nfq_close(q.h) + return nil, fmt.Errorf("Unable to set max packets in queue: %v\n", err) + } else if C.nfq_set_mode(q.qh, C.u_int8_t(2), C.uint(packetSize)) < 0 { + C.nfq_destroy_queue(q.qh) + C.nfq_close(q.h) + return nil, fmt.Errorf("Unable to set packets copy mode: %v\n", err) + } else if q.fd, err = C.nfq_fd(q.h); err != nil { + C.nfq_destroy_queue(q.qh) + C.nfq_close(q.h) + return nil, fmt.Errorf("Unable to get queue file-descriptor. %v\n", err) + } + + go q.run() + + return &q, nil +} + +//Unbind and close the queue +func (q *Queue) Close() { + C.nfq_destroy_queue(q.qh) + C.nfq_close(q.h) + queueIndexLock.Lock() + delete(queueIndex, q.idx) + queueIndexLock.Unlock() +} + +//Get the channel for packets +func (q *Queue) Packets() <-chan NFPacket { + return q.packets +} + +func (q *Queue) run() { + if errno := C.Run(q.h, q.fd); errno != 0 { + fmt.Fprintf(os.Stderr, "Terminating, unable to receive packet due to errno=%d\n", errno) + } +} + +//export go_callback +func go_callback(queueId C.int, data *C.uchar, length C.int, mark C.uint, idx uint32, vc *VerdictContainerC) { + (*vc).verdict = C.uint(NF_ACCEPT) + (*vc).data = nil + (*vc).mark_set = 0 + (*vc).length = 0 + + queueIndexLock.RLock() + queueChannel, found := queueIndex[idx] + queueIndexLock.RUnlock() + if !found { + fmt.Fprintf(os.Stderr, "Unexpected queue idx %d\n", idx) + return + } + + xdata := C.GoBytes(unsafe.Pointer(data), length) + + var packet gopacket.Packet + if xdata[0]&0xf0 == ipv4version { + packet = gopacket.NewPacket(xdata, layers.LayerTypeIPv4, gopacketDecodeOptions) + } else { + packet = gopacket.NewPacket(xdata, layers.LayerTypeIPv6, gopacketDecodeOptions) + } + + p := NFPacket{ + verdictChannel: make(chan VerdictContainer), + Mark: uint32(mark), + Packet: packet, + } + + select { + case *queueChannel <- p: + select { + case v := <-p.verdictChannel: + if v.Packet == nil { + (*vc).verdict = C.uint(v.Verdict) + } else { + (*vc).verdict = C.uint(v.Verdict) + (*vc).data = (*C.uchar)(unsafe.Pointer(&v.Packet[0])) + (*vc).length = C.uint(len(v.Packet)) + } + + if v.Mark != 0 { + (*vc).mark_set = C.uint(1) + (*vc).mark = C.uint(v.Mark) + } + } + + default: + fmt.Fprintf(os.Stderr, "Error sending packet to queue channel %d\n", idx) + } +} diff --git a/daemon/netfilter/netfilter.h b/daemon/netfilter/queue.h similarity index 74% rename from daemon/netfilter/netfilter.h rename to daemon/netfilter/queue.h index c9d5dbc6..13bb59f7 100644 --- a/daemon/netfilter/netfilter.h +++ b/daemon/netfilter/queue.h @@ -1,5 +1,5 @@ -#ifndef _NETFILTER_H -#define _NETFILTER_H +#ifndef _NETFILTER_QUEUE_H +#define _NETFILTER_QUEUE_H #include #include @@ -23,20 +23,18 @@ typedef struct { extern void go_callback(int id, unsigned char* data, int len, uint mark, u_int32_t idx, verdictContainer *vc); -static int nf_callback(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, struct nfq_data *nfa, void *cb_func){ - uint32_t id = -1; +static int nf_callback(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, struct nfq_data *nfa, void *arg){ + uint32_t id = -1, idx = 0, mark = 0; struct nfqnl_msg_packet_hdr *ph = NULL; unsigned char *buffer = NULL; int ret = 0; - u_int32_t idx; - uint mark = 0; - verdictContainer vc; + verdictContainer vc = {0}; mark = nfq_get_nfmark(nfa); - ph = nfq_get_msg_packet_hdr(nfa); - id = ntohl(ph->packet_id); - ret = nfq_get_payload(nfa, &buffer); - idx = (uint32_t)((uintptr_t)cb_func); + ph = nfq_get_msg_packet_hdr(nfa); + id = ntohl(ph->packet_id); + ret = nfq_get_payload(nfa, &buffer); + idx = (uint32_t)((uintptr_t)arg); go_callback(id, buffer, ret, mark, idx, &vc); @@ -53,13 +51,12 @@ static inline struct nfq_q_handle* CreateQueue(struct nfq_handle *h, u_int16_t q static inline int Run(struct nfq_handle *h, int fd) { char buf[4096] __attribute__ ((aligned)); - int rv; + int rcvd, opt = 1; - int opt = 1; setsockopt(fd, SOL_NETLINK, NETLINK_NO_ENOBUFS, &opt, sizeof(int)); - while ((rv = recv(fd, buf, sizeof(buf), 0)) && rv >= 0) { - nfq_handle_packet(h, buf, rv); + while ((rcvd = recv(fd, buf, sizeof(buf), 0)) && rcvd >= 0) { + nfq_handle_packet(h, buf, rcvd); } return errno;