Skip to content

API Reference

This page lists the main modules and classes of sparse-qubo.

Public API (top-level package): Use sparse_qubo.create_constraint_dwave for D-Wave/dimod BQMs and sparse_qubo.create_constraint_amplify for Fixstars Amplify models. Constraint and network types are sparse_qubo.ConstraintType and sparse_qubo.NetworkType. The sections below document the implementation modules.

Core modules

sparse_qubo.core.constraint

Constraint types and QUBO construction for equality/inequality constraints.

This module provides ConstraintType, get_initial_nodes, and get_constraint_qubo to build QUBOs from switching networks for use with D-Wave or Amplify.

ConstraintType

Bases: StrEnum

Type of linear constraint on binary variables (sum of variables).

Source code in src/sparse_qubo/core/constraint.py
42
43
44
45
46
47
48
49
class ConstraintType(StrEnum):
    """Type of linear constraint on binary variables (sum of variables)."""

    ONE_HOT = "one_hot"  # Exactly one variable is 1
    EQUAL_TO = "equal_to"  # Sum equals c1
    LESS_EQUAL = "less_equal"  # Sum <= c1
    GREATER_EQUAL = "greater_equal"  # Sum >= c1
    CLAMP = "clamp"  # c1 <= sum <= c2

get_constraint_qubo(variables, constraint_type, network_type=NetworkType.DIVIDE_AND_CONQUER, c1=None, c2=None, threshold=None, reverse=False, var_prefix=None)

Build a QUBO for the given constraint.

Auxiliary variable prefixes

When var_prefix is None (the default), a unique prefix is assigned internally (C0, C1, ...) so that merging multiple constraint QUBOs into one BQM avoids name collisions. The counter increments on each call and is not reset automatically; it resets only on process start or when calling :func:reset_constraint_prefix_counter. See the Usage section (Constraint prefix counter) in the documentation for details.

Source code in src/sparse_qubo/core/constraint.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def get_constraint_qubo(
    variables: list[str],
    constraint_type: ConstraintType,
    network_type: NetworkType = NetworkType.DIVIDE_AND_CONQUER,
    c1: int | None = None,
    c2: int | None = None,
    threshold: int | None = None,
    reverse: bool = False,
    var_prefix: str | None = None,
) -> QUBO:
    """Build a QUBO for the given constraint.

    **Auxiliary variable prefixes**

    When ``var_prefix`` is ``None`` (the default), a unique prefix is assigned
    internally (``C0``, ``C1``, ...) so that merging multiple constraint QUBOs
    into one BQM avoids name collisions. The counter increments on each call
    and is **not** reset automatically; it resets only on process start or when
    calling :func:`reset_constraint_prefix_counter`. See the Usage section
    (Constraint prefix counter) in the documentation for details.
    """
    global _constraint_prefix_counter
    if var_prefix is None:
        var_prefix = f"C{_constraint_prefix_counter!s}"
        _constraint_prefix_counter += 1

    match network_type:
        case NetworkType.BENES:
            left_nodes, right_nodes = get_initial_nodes(variables, constraint_type, c1, c2, exponentiation=True)
            switches = BenesNetwork.generate_network(left_nodes, right_nodes, threshold=threshold, reverse=reverse)
        case NetworkType.BITONIC_SORT:
            left_nodes, right_nodes = get_initial_nodes(variables, constraint_type, c1, c2, exponentiation=True)
            switches = BitonicSortNetwork.generate_network(
                left_nodes, right_nodes, threshold=threshold, reverse=reverse
            )
        case NetworkType.BUBBLE_SORT:
            left_nodes, right_nodes = get_initial_nodes(variables, constraint_type, c1, c2)
            switches = BubbleSortNetwork.generate_network(left_nodes, right_nodes, threshold=threshold, reverse=reverse)
        case NetworkType.CLOS_NETWORK_MAX_DEGREE:
            left_nodes, right_nodes = get_initial_nodes(variables, constraint_type, c1, c2)
            switches = ClosNetworkWithMaxDegree.generate_network(
                left_nodes, right_nodes, threshold=threshold, reverse=reverse
            )
        case NetworkType.CLOS_NETWORK_MIN_EDGE:
            left_nodes, right_nodes = get_initial_nodes(variables, constraint_type, c1, c2)
            switches = ClosNetworkMinimumEdge.generate_network(
                left_nodes, right_nodes, threshold=threshold, reverse=reverse
            )
        case NetworkType.DIVIDE_AND_CONQUER:
            left_nodes, right_nodes = get_initial_nodes(variables, constraint_type, c1, c2)
            switches = DivideAndConquerNetwork.generate_network(
                left_nodes, right_nodes, threshold=threshold, reverse=reverse
            )
        case NetworkType.ODDEVEN_MERGE_SORT:
            left_nodes, right_nodes = get_initial_nodes(variables, constraint_type, c1, c2, exponentiation=True)
            switches = OddEvenMergeSortNetwork.generate_network(
                left_nodes, right_nodes, threshold=threshold, reverse=reverse
            )
        case _:
            raise NotImplementedError
    qubo = Switch.to_qubo(switches)
    qubo = _prefix_auxiliary_variables(qubo, set(variables), var_prefix)
    return qubo

get_initial_nodes(variables, constraint_type, c1=None, c2=None, exponentiation=False)

Build left and right VariableNode lists for a switching network from variable names and constraint type.

Used by get_constraint_qubo and by network implementations. If exponentiation is True, the right side is padded to a power-of-2 size (for Benes, Bitonic, OddEvenMergeSort).

Source code in src/sparse_qubo/core/constraint.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def get_initial_nodes(
    variables: list[str],
    constraint_type: ConstraintType,
    c1: int | None = None,
    c2: int | None = None,
    exponentiation: bool = False,
) -> tuple[list[VariableNode], list[VariableNode]]:
    """Build left and right VariableNode lists for a switching network from variable names and constraint type.

    Used by get_constraint_qubo and by network implementations. If exponentiation is True,
    the right side is padded to a power-of-2 size (for Benes, Bitonic, OddEvenMergeSort).
    """
    original_size = len(variables)
    target_size = original_size
    if exponentiation and original_size > 0:
        target_size = 1 << (original_size - 1).bit_length()
    pad_len = target_size - original_size

    left_pad_attrs = [NodeAttribute.ALWAYS_ZERO] * pad_len
    left_var_attrs = [NodeAttribute.ZERO_OR_ONE] * original_size

    def get_original_right_attr(i: int) -> NodeAttribute:
        match constraint_type:
            case ConstraintType.ONE_HOT:
                return NodeAttribute.ALWAYS_ZERO if i < original_size - 1 else NodeAttribute.ALWAYS_ONE
            case ConstraintType.EQUAL_TO:
                if c1 is None or not (0 <= c1 <= original_size):
                    raise ValueError(f"c1 must be between 0 and {original_size}")
                return NodeAttribute.ALWAYS_ZERO if i < original_size - c1 else NodeAttribute.ALWAYS_ONE
            case ConstraintType.LESS_EQUAL:
                if c1 is None or not (0 < c1 <= original_size):
                    raise ValueError(f"c1 must be between 0 and {original_size}")
                return NodeAttribute.ALWAYS_ZERO if i < original_size - c1 else NodeAttribute.NOT_CARE
            case ConstraintType.GREATER_EQUAL:
                if c1 is None or not (0 <= c1 < original_size):
                    raise ValueError(f"c1 must be between 0 and {original_size}")
                return NodeAttribute.NOT_CARE if i < original_size - c1 else NodeAttribute.ALWAYS_ONE
            case ConstraintType.CLAMP:
                if c1 is None or c2 is None or not (0 <= c1 <= c2 <= original_size):
                    raise ValueError(f"c1 and c2 must be valid range (0 <= c1 <= c2 <= {original_size})")
                if i < original_size - c2:
                    return NodeAttribute.ALWAYS_ZERO
                elif i < original_size - c1:
                    return NodeAttribute.NOT_CARE
                else:
                    return NodeAttribute.ALWAYS_ONE
            case _:
                raise NotImplementedError(f"Constraint type {constraint_type} is not supported")

    original_right_attrs = [get_original_right_attr(i) for i in range(original_size)]
    right_attrs = [NodeAttribute.ALWAYS_ZERO] * pad_len + original_right_attrs

    left_nodes = [VariableNode(name=f"L{i}", attribute=attr) for i, attr in enumerate(left_pad_attrs)] + [
        VariableNode(name=str(v), attribute=attr) for v, attr in zip(variables, left_var_attrs, strict=True)
    ]
    right_nodes = [VariableNode(name=f"R{i}", attribute=attr) for i, attr in enumerate(right_attrs)]

    return left_nodes, right_nodes

reset_constraint_prefix_counter()

Reset the internal constraint prefix counter.

The prefix counter is not reset automatically. It is reset only when:

  1. Process start: Restarting Python (e.g. re-running a script or restarting a Jupyter kernel) reloads the module and sets the counter to 0.
  2. Explicit call: Calling this function sets the counter back to 0.

Call this when you want the next constraint to use prefix C0 again (e.g. when starting to build a new model in the same process). Mainly useful for testing or reproducible variable names.

Source code in src/sparse_qubo/core/constraint.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def reset_constraint_prefix_counter() -> None:
    """Reset the internal constraint prefix counter.

    The prefix counter is **not** reset automatically. It is reset only when:

    1. **Process start**: Restarting Python (e.g. re-running a script or
       restarting a Jupyter kernel) reloads the module and sets the counter to 0.
    2. **Explicit call**: Calling this function sets the counter back to 0.

    Call this when you want the next constraint to use prefix ``C0`` again
    (e.g. when starting to build a new model in the same process). Mainly useful
    for testing or reproducible variable names.
    """
    global _constraint_prefix_counter
    _constraint_prefix_counter = 0

sparse_qubo.core.network

Switching network types and base class for network implementations.

NetworkType enumerates available formulations. ISwitchingNetwork is the abstract base for networks that produce a list of Switch elements.

ISwitchingNetwork

Bases: ABC

Abstract base for switching networks that produce a list of Switch elements.

Source code in src/sparse_qubo/core/network.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class ISwitchingNetwork(ABC):
    """Abstract base for switching networks that produce a list of Switch elements."""

    @classmethod
    @abstractmethod
    def _generate_original_network(
        cls,
        left_nodes: list[VariableNode],
        right_nodes: list[VariableNode],
        threshold: int | None = None,
        reverse: bool = False,
    ) -> list[Switch]:
        """Return the raw list of Switch elements for the given left/right nodes."""
        pass

    @classmethod
    def generate_network(
        cls,
        left_nodes: list[VariableNode],
        right_nodes: list[VariableNode],
        threshold: int | None = None,
        reverse: bool = False,
    ) -> list[Switch]:
        """Build the switching network, simplifying switches when nodes are fixed (ALWAYS_ZERO/ALWAYS_ONE)."""
        network: list[Switch] = cls._generate_original_network(left_nodes, right_nodes, threshold, reverse)

        # Place switch while managing the set of rightmost nodes
        current_nodes: set[str] = {node.name for node in right_nodes}
        name_to_attribute: dict[str, NodeAttribute] = {node.name: node.attribute for node in right_nodes}
        result_network: list[Switch] = []
        for switch in network[::-1]:  # Look from the right
            # Raise an error if there are no nodes to connect
            if not switch.right_nodes.issubset(current_nodes):
                raise ValueError(f"Invalid network: {switch.right_nodes} is not subset of {current_nodes}")
            current_nodes.difference_update(switch.right_nodes)

            # Raise an error if nodes that should be newly generated already exist
            if not switch.left_nodes.isdisjoint(current_nodes):
                raise ValueError(f"Invalid network: {switch.left_nodes} is not disjoint with {current_nodes}")
            current_nodes.update(switch.left_nodes)

            # Calculate the range of possible values for left and right
            num_left_variables: int = len(switch.left_nodes)
            right_sum_min: int = (
                len([node for node in switch.right_nodes if name_to_attribute[node] == NodeAttribute.ALWAYS_ONE])
                + switch.right_constant
                - switch.left_constant
            )
            right_sum_max: int = (
                len([node for node in switch.right_nodes if name_to_attribute[node] != NodeAttribute.ALWAYS_ZERO])
                + switch.right_constant
                - switch.left_constant
            )

            # Raise an error if the range is not achievable
            if not (right_sum_max >= 0 and right_sum_min <= num_left_variables):
                raise ValueError(
                    f"Invalid network: right_sum_max = {right_sum_max} < 0 or right_sum_min = {right_sum_min} > {num_left_variables}"
                )

            # When the right side's lower bound matches the left side's upper bound, all left nodes must be 1
            if right_sum_min == num_left_variables:
                for node in switch.left_nodes:
                    name_to_attribute[node] = NodeAttribute.ALWAYS_ONE
                    result_network.append(
                        Switch(
                            left_nodes=frozenset([node]),
                            right_nodes=frozenset(),
                            left_constant=0,
                            right_constant=1,
                        )
                    )
            # When the right side's upper bound is 0, all left nodes must be 0
            elif right_sum_max == 0:
                for node in switch.left_nodes:
                    name_to_attribute[node] = NodeAttribute.ALWAYS_ZERO
                    result_network.append(
                        Switch(
                            left_nodes=frozenset([node]),
                            right_nodes=frozenset(),
                            left_constant=0,
                            right_constant=0,
                        )
                    )
            # When all right nodes are NOT_CARE and there are no restrictions on the left side's possible value range, all left nodes become NOT_CARE
            elif (
                all(name_to_attribute[node] == NodeAttribute.NOT_CARE for node in switch.right_nodes)
                and right_sum_min <= 0
                and right_sum_max >= num_left_variables
            ):
                for node in switch.left_nodes:
                    name_to_attribute[node] = NodeAttribute.NOT_CARE
            # Otherwise, left nodes become normal nodes
            else:
                for node in switch.left_nodes:
                    name_to_attribute[node] = NodeAttribute.ZERO_OR_ONE
                # Add network with constant nodes omitted
                result_network.append(
                    Switch(
                        left_nodes=frozenset(switch.left_nodes),
                        right_nodes=frozenset([
                            node
                            for node in switch.right_nodes
                            if name_to_attribute[node] != NodeAttribute.ALWAYS_ONE
                            and name_to_attribute[node] != NodeAttribute.ALWAYS_ZERO
                        ]),
                        left_constant=switch.left_constant,
                        right_constant=switch.right_constant
                        + len([
                            node for node in switch.right_nodes if name_to_attribute[node] == NodeAttribute.ALWAYS_ONE
                        ]),
                    )
                )
        if reverse:
            return result_network[::-1]
        else:
            return result_network

generate_network(left_nodes, right_nodes, threshold=None, reverse=False) classmethod

Build the switching network, simplifying switches when nodes are fixed (ALWAYS_ZERO/ALWAYS_ONE).

Source code in src/sparse_qubo/core/network.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
@classmethod
def generate_network(
    cls,
    left_nodes: list[VariableNode],
    right_nodes: list[VariableNode],
    threshold: int | None = None,
    reverse: bool = False,
) -> list[Switch]:
    """Build the switching network, simplifying switches when nodes are fixed (ALWAYS_ZERO/ALWAYS_ONE)."""
    network: list[Switch] = cls._generate_original_network(left_nodes, right_nodes, threshold, reverse)

    # Place switch while managing the set of rightmost nodes
    current_nodes: set[str] = {node.name for node in right_nodes}
    name_to_attribute: dict[str, NodeAttribute] = {node.name: node.attribute for node in right_nodes}
    result_network: list[Switch] = []
    for switch in network[::-1]:  # Look from the right
        # Raise an error if there are no nodes to connect
        if not switch.right_nodes.issubset(current_nodes):
            raise ValueError(f"Invalid network: {switch.right_nodes} is not subset of {current_nodes}")
        current_nodes.difference_update(switch.right_nodes)

        # Raise an error if nodes that should be newly generated already exist
        if not switch.left_nodes.isdisjoint(current_nodes):
            raise ValueError(f"Invalid network: {switch.left_nodes} is not disjoint with {current_nodes}")
        current_nodes.update(switch.left_nodes)

        # Calculate the range of possible values for left and right
        num_left_variables: int = len(switch.left_nodes)
        right_sum_min: int = (
            len([node for node in switch.right_nodes if name_to_attribute[node] == NodeAttribute.ALWAYS_ONE])
            + switch.right_constant
            - switch.left_constant
        )
        right_sum_max: int = (
            len([node for node in switch.right_nodes if name_to_attribute[node] != NodeAttribute.ALWAYS_ZERO])
            + switch.right_constant
            - switch.left_constant
        )

        # Raise an error if the range is not achievable
        if not (right_sum_max >= 0 and right_sum_min <= num_left_variables):
            raise ValueError(
                f"Invalid network: right_sum_max = {right_sum_max} < 0 or right_sum_min = {right_sum_min} > {num_left_variables}"
            )

        # When the right side's lower bound matches the left side's upper bound, all left nodes must be 1
        if right_sum_min == num_left_variables:
            for node in switch.left_nodes:
                name_to_attribute[node] = NodeAttribute.ALWAYS_ONE
                result_network.append(
                    Switch(
                        left_nodes=frozenset([node]),
                        right_nodes=frozenset(),
                        left_constant=0,
                        right_constant=1,
                    )
                )
        # When the right side's upper bound is 0, all left nodes must be 0
        elif right_sum_max == 0:
            for node in switch.left_nodes:
                name_to_attribute[node] = NodeAttribute.ALWAYS_ZERO
                result_network.append(
                    Switch(
                        left_nodes=frozenset([node]),
                        right_nodes=frozenset(),
                        left_constant=0,
                        right_constant=0,
                    )
                )
        # When all right nodes are NOT_CARE and there are no restrictions on the left side's possible value range, all left nodes become NOT_CARE
        elif (
            all(name_to_attribute[node] == NodeAttribute.NOT_CARE for node in switch.right_nodes)
            and right_sum_min <= 0
            and right_sum_max >= num_left_variables
        ):
            for node in switch.left_nodes:
                name_to_attribute[node] = NodeAttribute.NOT_CARE
        # Otherwise, left nodes become normal nodes
        else:
            for node in switch.left_nodes:
                name_to_attribute[node] = NodeAttribute.ZERO_OR_ONE
            # Add network with constant nodes omitted
            result_network.append(
                Switch(
                    left_nodes=frozenset(switch.left_nodes),
                    right_nodes=frozenset([
                        node
                        for node in switch.right_nodes
                        if name_to_attribute[node] != NodeAttribute.ALWAYS_ONE
                        and name_to_attribute[node] != NodeAttribute.ALWAYS_ZERO
                    ]),
                    left_constant=switch.left_constant,
                    right_constant=switch.right_constant
                    + len([
                        node for node in switch.right_nodes if name_to_attribute[node] == NodeAttribute.ALWAYS_ONE
                    ]),
                )
            )
    if reverse:
        return result_network[::-1]
    else:
        return result_network

NetworkType

Bases: StrEnum

Identifier for each switching network (or naive) formulation.

Source code in src/sparse_qubo/core/network.py
14
15
16
17
18
19
20
21
22
23
24
class NetworkType(StrEnum):
    """Identifier for each switching network (or naive) formulation."""

    NAIVE = "naive"
    BENES = "benes"
    BITONIC_SORT = "bitonic_sort"
    BUBBLE_SORT = "bubble_sort"
    CLOS_NETWORK_MAX_DEGREE = "clos_network_max_degree"
    CLOS_NETWORK_MIN_EDGE = "clos_network_min_edge"
    DIVIDE_AND_CONQUER = "divide_and_conquer"
    ODDEVEN_MERGE_SORT = "oddeven_merge_sort"

sparse_qubo.core.node

Nodes and attributes for switching network construction.

VariableNode represents a binary variable with an optional NodeAttribute (ALWAYS_ZERO, ALWAYS_ONE, NOT_CARE, or ZERO_OR_ONE).

NodeAttribute

Bases: Enum

Attribute of a binary variable in the switching network.

Source code in src/sparse_qubo/core/node.py
12
13
14
15
16
17
18
class NodeAttribute(Enum):
    """Attribute of a binary variable in the switching network."""

    ZERO_OR_ONE = "ZERO_OR_ONE"  # General case
    ALWAYS_ZERO = "ALWAYS_ZERO"  # Fixed to 0
    ALWAYS_ONE = "ALWAYS_ONE"  # Fixed to 1
    NOT_CARE = "NOT_CARE"  # Unconstrained

VariableNode

Bases: BaseModel

A binary variable with a name and an attribute.

Source code in src/sparse_qubo/core/node.py
21
22
23
24
25
class VariableNode(BaseModel):
    """A binary variable with a name and an attribute."""

    name: str = Field(description="Name of the binary variable", frozen=True)
    attribute: NodeAttribute = Field(default=NodeAttribute.ZERO_OR_ONE, description="Attribute of the binary variable")

sparse_qubo.core.switch

Switch elements and QUBO conversion for switching networks.

A Switch encodes a constraint that the sum of left variables plus left_constant equals the sum of right variables plus right_constant. Switch.to_qubo converts a list of Switch elements into a single QUBO (variables, linear, quadratic, constant).

QUBO

Bases: BaseModel

QUBO representation: variables, linear and quadratic coefficients, and constant.

Source code in src/sparse_qubo/core/switch.py
19
20
21
22
23
24
25
class QUBO(BaseModel):
    """QUBO representation: variables, linear and quadratic coefficients, and constant."""

    variables: frozenset[str] = Field(description="Set of binary variables")
    quadratic: dict[frozenset[str], float] = Field(description="Coefficients between binary variables")
    linear: dict[str, float] = Field(description="Coefficients of binary variables")
    constant: float = Field(default=0, description="Constant term")

Switch

Bases: BaseModel

Single switch: left and right variable sets and optional integer constants.

The constraint is: sum(left_nodes) + left_constant == sum(right_nodes) + right_constant.

Source code in src/sparse_qubo/core/switch.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
class Switch(BaseModel):
    """Single switch: left and right variable sets and optional integer constants.

    The constraint is: sum(left_nodes) + left_constant == sum(right_nodes) + right_constant.
    """

    left_nodes: frozenset[str] = Field(default_factory=frozenset, description="Binary variables (left side)")
    right_nodes: frozenset[str] = Field(default_factory=frozenset, description="Binary variables (right side)")
    left_constant: int = Field(default=0, description="Constant term (left side)")
    right_constant: int = Field(default=0, description="Constant term (right side)")

    def __post_init__(self) -> None:
        # Check for duplicate variables
        if len(self.left_nodes | self.right_nodes) != len(self.left_nodes) + len(self.right_nodes):
            raise ValueError("Duplicate variables found between left_nodes and right_nodes")

    def __repr__(self) -> str:
        return f"Switch(left={sorted(self.left_nodes)} + {self.left_constant}, right={sorted(self.right_nodes)} + {self.right_constant})"

    @property
    def num_variables(self) -> int:
        """Number of variables in this switch (left + right)."""
        return len(self.left_nodes) + len(self.right_nodes)

    @property
    def num_edges(self) -> int:
        """Number of quadratic terms if this switch were converted to QUBO alone."""
        return self.num_variables * (self.num_variables - 1) // 2

    @classmethod
    def to_qubo(cls, switches: list[Self]) -> QUBO:
        """Convert a list of Switch elements into a single QUBO (sum of (L + c_L - R - c_R)^2 terms)."""
        variables: set[str] = set()
        quadratic: dict[frozenset[str], float] = defaultdict(float)
        linear: dict[str, float] = defaultdict(float)
        constant: float = 0

        # Constant terms are not discarded
        # At the call stage, each node in Switch is either NOT_CARE or ZERO_OR_ONE
        # ALWAYS_ZERO and ALWAYS_ONE are processed as constant terms
        # NOT_CARE is treated the same as ZERO_OR_ONE
        for switch in switches:
            # (L1 + L2 - R1 - R2 + C)^2
            # = 2L1L2 + 2R1R2 - 2(L1R1 + ...)
            # + L1 + L2 + R1 + R2 + 2C(L1 + L2 - R1 - R2)
            # + C^2

            switch_constant = switch.left_constant - switch.right_constant
            variables.update(switch.left_nodes)
            variables.update(switch.right_nodes)
            # quadratic
            for node1, node2 in combinations(switch.left_nodes, 2):
                quadratic[frozenset((node1, node2))] += 2
            for node1, node2 in combinations(switch.right_nodes, 2):
                quadratic[frozenset((node1, node2))] += 2
            for node1, node2 in product(switch.left_nodes, switch.right_nodes):
                quadratic[frozenset((node1, node2))] -= 2
            # linear
            for node in switch.left_nodes:
                linear[node] += 2 * switch_constant
                linear[node] += 1  # Because x*x = x
            for node in switch.right_nodes:
                linear[node] -= 2 * switch_constant
                linear[node] += 1
            # constant
            constant += switch_constant**2
        qubo = QUBO(
            variables=frozenset(variables),
            quadratic=quadratic,
            linear=linear,
            constant=constant,
        )

        return qubo

    @classmethod
    def left_node_to_switch(cls, switches: list[Self]) -> dict[str, int]:
        """Map each left-side variable name to the index of the switch that contains it."""
        left_node_to_switch: dict[str, int] = {}
        for idx, switch in enumerate(switches):
            for node in switch.left_nodes:
                left_node_to_switch[node] = idx
        return left_node_to_switch

    @classmethod
    def right_node_to_switch(cls, switches: list[Self]) -> dict[str, int]:
        """Map each right-side variable name to the index of the switch that contains it."""
        right_node_to_switch: dict[str, int] = {}
        for idx, switch in enumerate(switches):
            for node in switch.right_nodes:
                right_node_to_switch[node] = idx
        return right_node_to_switch

    @classmethod
    def determine_layer_structure(
        cls,
        switches: list[Self],
    ) -> dict[int, list[int]]:
        """Determine which layer each switch belongs to. Returns {layer_number: [switch_indices]} (0-indexed)."""
        left_node_to_switch = cls.left_node_to_switch(switches)
        layer_structure: dict[int, list[int]] = {}
        switch_idx_to_layer_number: dict[int, int] = {}

        # Identify original variables (L0, L1, ...)
        waiting_left_nodes: deque[tuple[str, int]] = deque()  # (node, layer_number)
        for switch in switches:
            for node in switch.left_nodes:
                if re.match(r"^L\d+$", node):
                    waiting_left_nodes.append((node, 0))

        # Determine the layer of each switch
        while waiting_left_nodes:
            node, layer_number = waiting_left_nodes.pop()
            if node not in left_node_to_switch:
                continue
            if left_node_to_switch[node] in switch_idx_to_layer_number:
                continue
            switch_idx_to_layer_number[left_node_to_switch[node]] = layer_number
            waiting_left_nodes.extend(
                (node, layer_number + 1) for node in switches[left_node_to_switch[node]].right_nodes
            )

        for switch_idx, layer_number in switch_idx_to_layer_number.items():
            if layer_number not in layer_structure:
                layer_structure[layer_number] = []
            layer_structure[layer_number].append(switch_idx)

        return layer_structure

    @classmethod
    def visualize_switches(
        cls,
        switches: list["Switch"],
        output_path: str,
        layout_type: str = "network",  # "network" or "spring"
        layer_spacing: float = 2.0,
        node_spacing: float = 1.0,
    ) -> None:
        """Draw the relationship between variables and switches as a graph and save to output_path."""
        all_nodes: set[str] = set()
        for switch in switches:
            all_nodes.update(switch.left_nodes)
            all_nodes.update(switch.right_nodes)

        # Generate colors for each switch (same as generate_and_draw_qubo_network)
        cmap = plt.get_cmap("tab10")
        switch_colors = [cmap(i % 10) for i in range(len(switches))]

        graph: nx.Graph[str] = nx.Graph()
        graph.add_nodes_from(all_nodes, node_color="blue")

        # Set colors for switch nodes
        for switch_idx in range(len(switches)):
            graph.add_node("C" + str(switch_idx), node_color=switch_colors[switch_idx])

        # Add constant nodes and set colors for edges
        for switch_idx, switch in enumerate(switches):
            switch_color = switch_colors[switch_idx]
            for node in switch.left_nodes:
                graph.add_edge(node, "C" + str(switch_idx), edge_color=switch_color)
            for node in switch.right_nodes:
                graph.add_edge(node, "C" + str(switch_idx), edge_color=switch_color)
            if switch.left_constant != 0:
                const_node = f"LC_{switch_idx}_{switch.left_constant}"
                graph.add_node(const_node, node_color="g")
                graph.add_edge(
                    const_node,
                    "C" + str(switch_idx),
                    edge_color=switch_color,
                )
            if switch.right_constant != 0:
                const_node = f"RC_{switch_idx}_{switch.right_constant}"
                graph.add_node(const_node, node_color="green")
                graph.add_edge(
                    "C" + str(switch_idx),
                    const_node,
                    edge_color=switch_color,
                )

        # Determine layout
        pos: dict[str, tuple[float, float]]
        if layout_type == "network":
            pos = cls._create_network_layout_with_layers(
                switches,
                all_nodes,
                layer_spacing,
                node_spacing,
            )
        else:
            raw_pos = nx.spring_layout(graph)
            pos = {node: (float(p[0]), float(p[1])) for node, p in raw_pos.items()}

        edge_colors: list[str] = [graph[u][v]["edge_color"] for u, v in graph.edges()]

        # Draw the graph
        plt.figure(figsize=(max(12, len(switches) * 1.5), 10))

        # Draw nodes by type
        # Blue nodes (circle)
        nx.draw_networkx_nodes(
            graph,
            pos,
            nodelist=list(all_nodes),
            node_color="blue",
            node_size=1500,
            node_shape="o",
            alpha=0.5,
        )
        # switch nodes (square, different color for each switch)
        for switch_idx in range(len(switches)):
            switch_node = "C" + str(switch_idx)
            nx.draw_networkx_nodes(
                graph,
                pos,
                nodelist=[switch_node],
                node_color=[switch_colors[switch_idx]],
                node_size=5000,
                node_shape="s",
                alpha=0.5,
            )
        # Green constant nodes (circle)
        const_nodes = [node for node in graph.nodes() if node.startswith("LC_") or node.startswith("RC_")]
        if const_nodes:
            nx.draw_networkx_nodes(
                graph,
                pos,
                nodelist=const_nodes,
                node_color="green",
                node_size=1500,
                node_shape="o",
                alpha=0.5,
            )

        # Draw edges and labels
        nx.draw_networkx_edges(
            graph,
            pos,
            edge_color=edge_colors,
        )
        nx.draw_networkx_labels(
            graph,
            pos,
            font_size=12,
            font_weight="bold",
        )

        # Display layer information
        if layout_type == "network":
            layer_structure = cls.determine_layer_structure(switches)
            cls._add_layer_labels(layer_structure, layer_spacing)
            plt.title(
                f"Switches Network ({len(layer_structure)} layers)",
                fontsize=16,
                fontweight="bold",
            )

        plt.tight_layout()
        plt.savefig(os.path.join(output_path, "switches.png"), dpi=300, bbox_inches="tight")
        plt.close()

    @classmethod
    def _create_network_layout_with_layers(
        cls,
        switches: list["Switch"],
        all_nodes: set[str],
        layer_spacing: float,
        node_spacing: float,
    ) -> dict[str, tuple[float, float]]:
        """
        Create a manual layout based on layer structure
        Layers are arranged to the right, and switches within each layer are arranged vertically
        """
        pos = {}

        # Determine layer structure
        layer_structure = cls.determine_layer_structure(switches)

        # Calculate position of each layer
        for layer_num, switch_indices in sorted(layer_structure.items()):
            x = (layer_num - 1) * layer_spacing

            # Arrange switches vertically within each layer
            for i, switch_idx in enumerate(switch_indices):
                switch = switches[switch_idx]
                switch_y = -(i - len(switch_indices) / 2) * node_spacing

                # Place switch node
                pos[f"C{switch_idx}"] = (x, switch_y)

                # Place left nodes on the left (smaller index is higher, center aligns with switch node)
                left_x_start = x - 0.8
                sorted_left_nodes = sorted(switch.left_nodes)
                for j, node in enumerate(sorted_left_nodes):
                    y = switch_y - (j - (len(sorted_left_nodes) - 1) / 2) * 0.3
                    pos[node] = (left_x_start, y)

                # Place right nodes on the right (smaller index is higher, center aligns with switch node)
                right_x_start = x + 0.8
                sorted_right_nodes = sorted(switch.right_nodes)
                for j, node in enumerate(sorted_right_nodes):
                    y = switch_y - (j - (len(sorted_right_nodes) - 1) / 2) * 0.3
                    pos[node] = (right_x_start, y)

                # Place constant nodes (same height as switch node)
                if switch.left_constant != 0:
                    const_node = f"LC_{switch_idx}_{switch.left_constant}"
                    pos[const_node] = (left_x_start, switch_y)
                if switch.right_constant != 0:
                    const_node = f"RC_{switch_idx}_{switch.right_constant}"
                    pos[const_node] = (right_x_start, switch_y)

        # Place other nodes (unconnected nodes) in appropriate positions
        used_nodes = set(pos.keys())
        unused_nodes = all_nodes - used_nodes

        if unused_nodes:
            # Place unused nodes to the right of the last layer
            max_layer = max(layer_structure.keys()) if layer_structure else 1
            last_x = (max_layer - 1) * layer_spacing + 1.5
            for i, node in enumerate(sorted(unused_nodes)):
                pos[node] = (last_x, -2.0 - i * node_spacing)

        return pos

    @classmethod
    def _add_layer_labels(
        cls,
        layer_structure: dict[int, list[int]],
        layer_spacing: float,
    ) -> None:
        """
        Add layer labels
        """
        for layer_num, _ in layer_structure.items():
            x = (layer_num - 1) * layer_spacing
            # Place layer labels at the top of each layer
            plt.text(
                x,
                3.0,
                f"Layer {layer_num}",
                ha="center",
                va="bottom",
                fontsize=14,
                fontweight="bold",
                bbox={"boxstyle": "round,pad=0.3", "facecolor": "lightblue", "alpha": 0.7},
            )

num_edges property

Number of quadratic terms if this switch were converted to QUBO alone.

num_variables property

Number of variables in this switch (left + right).

determine_layer_structure(switches) classmethod

Determine which layer each switch belongs to. Returns {layer_number: [switch_indices]} (0-indexed).

Source code in src/sparse_qubo/core/switch.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@classmethod
def determine_layer_structure(
    cls,
    switches: list[Self],
) -> dict[int, list[int]]:
    """Determine which layer each switch belongs to. Returns {layer_number: [switch_indices]} (0-indexed)."""
    left_node_to_switch = cls.left_node_to_switch(switches)
    layer_structure: dict[int, list[int]] = {}
    switch_idx_to_layer_number: dict[int, int] = {}

    # Identify original variables (L0, L1, ...)
    waiting_left_nodes: deque[tuple[str, int]] = deque()  # (node, layer_number)
    for switch in switches:
        for node in switch.left_nodes:
            if re.match(r"^L\d+$", node):
                waiting_left_nodes.append((node, 0))

    # Determine the layer of each switch
    while waiting_left_nodes:
        node, layer_number = waiting_left_nodes.pop()
        if node not in left_node_to_switch:
            continue
        if left_node_to_switch[node] in switch_idx_to_layer_number:
            continue
        switch_idx_to_layer_number[left_node_to_switch[node]] = layer_number
        waiting_left_nodes.extend(
            (node, layer_number + 1) for node in switches[left_node_to_switch[node]].right_nodes
        )

    for switch_idx, layer_number in switch_idx_to_layer_number.items():
        if layer_number not in layer_structure:
            layer_structure[layer_number] = []
        layer_structure[layer_number].append(switch_idx)

    return layer_structure

left_node_to_switch(switches) classmethod

Map each left-side variable name to the index of the switch that contains it.

Source code in src/sparse_qubo/core/switch.py
103
104
105
106
107
108
109
110
@classmethod
def left_node_to_switch(cls, switches: list[Self]) -> dict[str, int]:
    """Map each left-side variable name to the index of the switch that contains it."""
    left_node_to_switch: dict[str, int] = {}
    for idx, switch in enumerate(switches):
        for node in switch.left_nodes:
            left_node_to_switch[node] = idx
    return left_node_to_switch

right_node_to_switch(switches) classmethod

Map each right-side variable name to the index of the switch that contains it.

Source code in src/sparse_qubo/core/switch.py
112
113
114
115
116
117
118
119
@classmethod
def right_node_to_switch(cls, switches: list[Self]) -> dict[str, int]:
    """Map each right-side variable name to the index of the switch that contains it."""
    right_node_to_switch: dict[str, int] = {}
    for idx, switch in enumerate(switches):
        for node in switch.right_nodes:
            right_node_to_switch[node] = idx
    return right_node_to_switch

to_qubo(switches) classmethod

Convert a list of Switch elements into a single QUBO (sum of (L + c_L - R - c_R)^2 terms).

Source code in src/sparse_qubo/core/switch.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@classmethod
def to_qubo(cls, switches: list[Self]) -> QUBO:
    """Convert a list of Switch elements into a single QUBO (sum of (L + c_L - R - c_R)^2 terms)."""
    variables: set[str] = set()
    quadratic: dict[frozenset[str], float] = defaultdict(float)
    linear: dict[str, float] = defaultdict(float)
    constant: float = 0

    # Constant terms are not discarded
    # At the call stage, each node in Switch is either NOT_CARE or ZERO_OR_ONE
    # ALWAYS_ZERO and ALWAYS_ONE are processed as constant terms
    # NOT_CARE is treated the same as ZERO_OR_ONE
    for switch in switches:
        # (L1 + L2 - R1 - R2 + C)^2
        # = 2L1L2 + 2R1R2 - 2(L1R1 + ...)
        # + L1 + L2 + R1 + R2 + 2C(L1 + L2 - R1 - R2)
        # + C^2

        switch_constant = switch.left_constant - switch.right_constant
        variables.update(switch.left_nodes)
        variables.update(switch.right_nodes)
        # quadratic
        for node1, node2 in combinations(switch.left_nodes, 2):
            quadratic[frozenset((node1, node2))] += 2
        for node1, node2 in combinations(switch.right_nodes, 2):
            quadratic[frozenset((node1, node2))] += 2
        for node1, node2 in product(switch.left_nodes, switch.right_nodes):
            quadratic[frozenset((node1, node2))] -= 2
        # linear
        for node in switch.left_nodes:
            linear[node] += 2 * switch_constant
            linear[node] += 1  # Because x*x = x
        for node in switch.right_nodes:
            linear[node] -= 2 * switch_constant
            linear[node] += 1
        # constant
        constant += switch_constant**2
    qubo = QUBO(
        variables=frozenset(variables),
        quadratic=quadratic,
        linear=linear,
        constant=constant,
    )

    return qubo

visualize_switches(switches, output_path, layout_type='network', layer_spacing=2.0, node_spacing=1.0) classmethod

Draw the relationship between variables and switches as a graph and save to output_path.

Source code in src/sparse_qubo/core/switch.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@classmethod
def visualize_switches(
    cls,
    switches: list["Switch"],
    output_path: str,
    layout_type: str = "network",  # "network" or "spring"
    layer_spacing: float = 2.0,
    node_spacing: float = 1.0,
) -> None:
    """Draw the relationship between variables and switches as a graph and save to output_path."""
    all_nodes: set[str] = set()
    for switch in switches:
        all_nodes.update(switch.left_nodes)
        all_nodes.update(switch.right_nodes)

    # Generate colors for each switch (same as generate_and_draw_qubo_network)
    cmap = plt.get_cmap("tab10")
    switch_colors = [cmap(i % 10) for i in range(len(switches))]

    graph: nx.Graph[str] = nx.Graph()
    graph.add_nodes_from(all_nodes, node_color="blue")

    # Set colors for switch nodes
    for switch_idx in range(len(switches)):
        graph.add_node("C" + str(switch_idx), node_color=switch_colors[switch_idx])

    # Add constant nodes and set colors for edges
    for switch_idx, switch in enumerate(switches):
        switch_color = switch_colors[switch_idx]
        for node in switch.left_nodes:
            graph.add_edge(node, "C" + str(switch_idx), edge_color=switch_color)
        for node in switch.right_nodes:
            graph.add_edge(node, "C" + str(switch_idx), edge_color=switch_color)
        if switch.left_constant != 0:
            const_node = f"LC_{switch_idx}_{switch.left_constant}"
            graph.add_node(const_node, node_color="g")
            graph.add_edge(
                const_node,
                "C" + str(switch_idx),
                edge_color=switch_color,
            )
        if switch.right_constant != 0:
            const_node = f"RC_{switch_idx}_{switch.right_constant}"
            graph.add_node(const_node, node_color="green")
            graph.add_edge(
                "C" + str(switch_idx),
                const_node,
                edge_color=switch_color,
            )

    # Determine layout
    pos: dict[str, tuple[float, float]]
    if layout_type == "network":
        pos = cls._create_network_layout_with_layers(
            switches,
            all_nodes,
            layer_spacing,
            node_spacing,
        )
    else:
        raw_pos = nx.spring_layout(graph)
        pos = {node: (float(p[0]), float(p[1])) for node, p in raw_pos.items()}

    edge_colors: list[str] = [graph[u][v]["edge_color"] for u, v in graph.edges()]

    # Draw the graph
    plt.figure(figsize=(max(12, len(switches) * 1.5), 10))

    # Draw nodes by type
    # Blue nodes (circle)
    nx.draw_networkx_nodes(
        graph,
        pos,
        nodelist=list(all_nodes),
        node_color="blue",
        node_size=1500,
        node_shape="o",
        alpha=0.5,
    )
    # switch nodes (square, different color for each switch)
    for switch_idx in range(len(switches)):
        switch_node = "C" + str(switch_idx)
        nx.draw_networkx_nodes(
            graph,
            pos,
            nodelist=[switch_node],
            node_color=[switch_colors[switch_idx]],
            node_size=5000,
            node_shape="s",
            alpha=0.5,
        )
    # Green constant nodes (circle)
    const_nodes = [node for node in graph.nodes() if node.startswith("LC_") or node.startswith("RC_")]
    if const_nodes:
        nx.draw_networkx_nodes(
            graph,
            pos,
            nodelist=const_nodes,
            node_color="green",
            node_size=1500,
            node_shape="o",
            alpha=0.5,
        )

    # Draw edges and labels
    nx.draw_networkx_edges(
        graph,
        pos,
        edge_color=edge_colors,
    )
    nx.draw_networkx_labels(
        graph,
        pos,
        font_size=12,
        font_weight="bold",
    )

    # Display layer information
    if layout_type == "network":
        layer_structure = cls.determine_layer_structure(switches)
        cls._add_layer_labels(layer_structure, layer_spacing)
        plt.title(
            f"Switches Network ({len(layer_structure)} layers)",
            fontsize=16,
            fontweight="bold",
        )

    plt.tight_layout()
    plt.savefig(os.path.join(output_path, "switches.png"), dpi=300, bbox_inches="tight")
    plt.close()

Network implementations

sparse_qubo.networks.benes_network

Implementation of Benes network

BenesNetwork

Bases: ClosNetworkBase

Benes network implementation; requires power-of-2 variable count.

Source code in src/sparse_qubo/networks/benes_network.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BenesNetwork(ClosNetworkBase):
    """Benes network implementation; requires power-of-2 variable count."""

    @classmethod
    def _implement_if_small(cls, left_nodes: list[str], right_nodes: list[str]) -> list[Switch] | None:
        N = max(len(left_nodes), len(right_nodes))
        if N <= 2:
            return [
                Switch(
                    left_nodes=frozenset(left_nodes),
                    right_nodes=frozenset(right_nodes),
                )
            ]
        return None

    @classmethod
    def _determine_switch_sizes(cls, N_left: int, N_right: int) -> tuple[int, int]:
        """Return (n, r) for Benes network with n*r >= N."""
        N = max(N_left, N_right)
        n, r = 2, 1
        while n * r < N:
            r *= 2
        return n, r

sparse_qubo.networks.bitonic_sort_network

Implementation of Bitonic sort network

BitonicSortNetwork

Bases: ISwitchingNetwork

Bitonic sort network; requires power-of-2 variable count.

Source code in src/sparse_qubo/networks/bitonic_sort_network.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class BitonicSortNetwork(ISwitchingNetwork):
    """Bitonic sort network; requires power-of-2 variable count."""

    @classmethod
    def _generate_original_network(
        cls,
        left_nodes: list[VariableNode],
        right_nodes: list[VariableNode],
        threshold: int | None = None,
        reverse: bool = False,
    ) -> list[Switch]:
        """Return the list of Switch elements for the bitonic sort network."""
        left_names = [node.name for node in left_nodes]
        right_names = [node.name for node in right_nodes]
        if len(left_names) != len(right_names):
            raise ValueError("left_nodes and right_nodes must have the same length")
        N: int = len(left_names)
        n: int = round(log2(N))
        if 2**n != N:
            raise ValueError("N must be a power of 2")
        if not reverse:
            left_names, right_names = right_names, left_names

        all_nodes: list[list[str]] = [[] for _ in range(N)]
        for i in range(N):
            all_nodes[i].append(left_names[i])
            for j in range(n * (n + 1) // 2 - 1):
                all_nodes[i].append(f"{left_names[i]}_{j}_{right_names[i]}")
            all_nodes[i].append(right_names[i])

        progress: list[int] = [0] * N
        result_switches: list[Switch] = []
        # m is called in the order (0, 1, 2, 3), (0, 1, 2), (0, 1), (0)
        if reverse:
            for m_max in range(n)[::-1]:
                for m in range(m_max + 1):
                    M = 2**m
                    for i in range(N):
                        if (i // M) % 2 == 0:
                            result_switches.append(
                                Switch(
                                    left_nodes=frozenset([
                                        all_nodes[i][progress[i]],
                                        all_nodes[i + M][progress[i + M]],
                                    ]),
                                    right_nodes=frozenset([
                                        all_nodes[i][progress[i] + 1],
                                        all_nodes[i + M][progress[i + M] + 1],
                                    ]),
                                )
                            )
                            progress[i] += 1
                            progress[i + M] += 1
            return result_switches
        else:
            for m_max in range(n)[::-1]:
                for m in range(m_max + 1):
                    M = 2**m
                    for i in range(N):
                        if (i // M) % 2 == 0:
                            result_switches.append(
                                Switch(
                                    left_nodes=frozenset([
                                        all_nodes[i][progress[i] + 1],
                                        all_nodes[i + M][progress[i + M] + 1],
                                    ]),
                                    right_nodes=frozenset([
                                        all_nodes[i][progress[i]],
                                        all_nodes[i + M][progress[i + M]],
                                    ]),
                                )
                            )
                            progress[i] += 1
                            progress[i + M] += 1
            return result_switches[::-1]

sparse_qubo.networks.bubble_sort_network

Implementation of Bubble sort network

BubbleSortNetwork

Bases: ISwitchingNetwork

Bubble sort network; works for any variable count.

Source code in src/sparse_qubo/networks/bubble_sort_network.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class BubbleSortNetwork(ISwitchingNetwork):
    """Bubble sort network; works for any variable count."""

    @classmethod
    def _generate_original_network(
        cls,
        left_nodes: list[VariableNode],
        right_nodes: list[VariableNode],
        threshold: int | None = None,
        reverse: bool = False,
    ) -> list[Switch]:
        """Return the list of Switch elements for the bubble sort network."""
        left_names = [node.name for node in left_nodes]
        right_names = [node.name for node in right_nodes]
        if len(left_names) != len(right_names):
            raise ValueError("left_nodes and right_nodes must have the same length")
        N: int = len(left_names)

        all_nodes: list[list[str]] = [[] for _ in range(N)]
        for i in range(N):
            all_nodes[i].append(left_names[i])
            for j in range((N - 1 - i) * 2 if i > 0 else N - 2):
                all_nodes[i].append(f"{left_names[i]}_{j}_{right_names[i]}")
            all_nodes[i].append(right_names[i])

        progress: list[int] = [0] * N
        result_switches: list[Switch] = []
        for i in list(range(1, N)) + list(range(1, N - 1)[::-1]):
            for j in range(0, i, 2):
                k1, k2 = i - j, i - j - 1
                result_switches.append(
                    Switch(
                        left_nodes=frozenset([all_nodes[k1][progress[k1]], all_nodes[k2][progress[k2]]]),
                        right_nodes=frozenset([
                            all_nodes[k1][progress[k1] + 1],
                            all_nodes[k2][progress[k2] + 1],
                        ]),
                    )
                )
                progress[k1] += 1
                progress[k2] += 1
        return result_switches

sparse_qubo.networks.clique_network

Implementation of clique network: single all-to-all switch between left and right variables.

CliqueNetwork

Bases: ISwitchingNetwork

Single switch connecting all left variables to all right variables (clique).

Source code in src/sparse_qubo/networks/clique_network.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class CliqueNetwork(ISwitchingNetwork):
    """Single switch connecting all left variables to all right variables (clique)."""

    @classmethod
    def _generate_original_network(
        cls,
        left_nodes: list[VariableNode],
        right_nodes: list[VariableNode],
        threshold: int | None = None,
        reverse: bool = False,
    ) -> list[Switch]:
        """Return a single Switch with all left and all right variables."""
        left_names = [node.name for node in left_nodes]
        right_names = [node.name for node in right_nodes]
        return [
            Switch(
                left_nodes=frozenset(left_names),
                right_nodes=frozenset(right_names),
            )
        ]

sparse_qubo.networks.clos_network_base

Implementation of Clos network base: three-stage switching network with configurable switch sizes.

ClosNetworkBase

Bases: ISwitchingNetwork, ABC

Base class for Clos-type networks (ingress, middle, egress stages).

Source code in src/sparse_qubo/networks/clos_network_base.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class ClosNetworkBase(ISwitchingNetwork, ABC):
    """Base class for Clos-type networks (ingress, middle, egress stages)."""

    @classmethod
    @abstractmethod
    def _implement_if_small(cls, left_nodes: list[str], right_nodes: list[str]) -> list[Switch] | None:
        """Return a small network for small N, or None to use the full Clos construction."""
        pass

    @classmethod
    @abstractmethod
    def _determine_switch_sizes(cls, N_left: int, N_right: int) -> tuple[int, int]:
        """Return (n, r) for exterior switch size n and number of interior switches r."""
        pass

    @classmethod
    def _generate_original_network(
        cls,
        left_nodes: list[VariableNode],
        right_nodes: list[VariableNode],
        threshold: int | None = None,
        reverse: bool = False,
    ) -> list[Switch]:
        """Build the three-stage Clos network (ingress, middle, egress)."""
        left_names = [node.name for node in left_nodes]
        right_names = [node.name for node in right_nodes]

        if result := cls._implement_if_small(left_names, right_names):
            return result

        left_size: int = len(left_names)
        right_size: int = len(right_names)
        exterior_switch_size, interior_switch_size = cls._determine_switch_sizes(left_size, right_size)
        intermediate_node_size: int = exterior_switch_size * interior_switch_size
        if max(left_size, right_size) > intermediate_node_size:
            raise ValueError("switch size is too small")

        # Generate switches for the ingress stage
        ingress_stage_switches: list[Switch] = []
        ingress_stage_nodes: list[str] = []
        interior_index_start: int
        interior_index_end: int
        interior_node_names: list[str]
        for r in range(interior_switch_size):
            left_index_start: int = r * left_size // interior_switch_size
            left_index_end: int = (r + 1) * left_size // interior_switch_size
            interior_index_start = exterior_switch_size * r
            interior_index_end = exterior_switch_size * (r + 1)

            interior_node_names = [
                f"{left_names[min(i, left_index_end - 1)]}_{i}" for i in range(interior_index_start, interior_index_end)
            ]
            ingress_stage_switches.append(
                Switch(
                    left_nodes=frozenset(left_names[left_index_start:left_index_end]),
                    right_nodes=frozenset(interior_node_names),
                )
            )
            ingress_stage_nodes.extend(interior_node_names)

        # Generate switches for the egress stage
        egress_stage_switches: list[Switch] = []
        egress_stage_nodes: list[str] = []
        for r in range(interior_switch_size):
            right_index_start: int = r * right_size // interior_switch_size
            right_index_end: int = (r + 1) * right_size // interior_switch_size
            interior_index_start = exterior_switch_size * r
            interior_index_end = exterior_switch_size * (r + 1)

            interior_node_names = [
                f"{right_names[min(i, right_index_end - 1)]}_{i}"
                for i in range(interior_index_start, interior_index_end)
            ]
            egress_stage_switches.append(
                Switch(
                    left_nodes=frozenset(interior_node_names),
                    right_nodes=frozenset(right_names[right_index_start:right_index_end]),
                )
            )
            egress_stage_nodes.extend(interior_node_names)

        # # Generate switches for the middle stage
        # middle_stage_switches: list[Switch] = []
        # for i_start in range(exterior_switch_size):
        #     middle_stage_switches.extend(
        #         cls._generate_original_network(
        #             ingress_stage_nodes[i_start:intermediate_node_size:exterior_switch_size],
        #             egress_stage_nodes[i_start:intermediate_node_size:exterior_switch_size],
        #         )
        #     )

        # Generate switches for the middle stage
        middle_stage_switches: list[Switch] = []
        for i_start in range(exterior_switch_size):
            # 1. First, get string slices (sub-lists)
            sub_left_names = ingress_stage_nodes[i_start:intermediate_node_size:exterior_switch_size]
            sub_right_names = egress_stage_nodes[i_start:intermediate_node_size:exterior_switch_size]

            # 2. Convert to VariableNode and make recursive call
            middle_stage_switches.extend(
                cls._generate_original_network(
                    left_nodes=[VariableNode(name=n) for n in sub_left_names],
                    right_nodes=[VariableNode(name=n) for n in sub_right_names],
                    threshold=threshold,
                    reverse=reverse,
                )
            )

        # Connect the three and return
        return ingress_stage_switches + middle_stage_switches + egress_stage_switches

sparse_qubo.networks.clos_network_max_degree

Implementation of Clos network with maximum degree constraint: limits the size of each switch.

AdhocNetworkWithMinimumDegree

Helper for small networks when max degree is large enough (single switch).

Source code in src/sparse_qubo/networks/clos_network_max_degree.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class AdhocNetworkWithMinimumDegree:
    """Helper for small networks when max degree is large enough (single switch)."""

    @classmethod
    def implement_if_small(cls, left_nodes: list[str], right_nodes: list[str], max_degree: int) -> list[Switch]:
        N = max(len(left_nodes), len(right_nodes))
        if N < 2:
            raise ValueError("N must be greater than or equal to 2")
        if max_degree >= N:
            return [
                Switch(
                    left_nodes=frozenset(left_nodes),
                    right_nodes=frozenset(right_nodes),
                )
            ]
        # elif N <= max_degree * 1.5:
        #     first_vars: list[str] = [
        #         f"{left_nodes[i]}_{right_nodes[i]}"
        #         for i in range(math.floor(max_degree / 2))
        #     ]
        #     second_left_vars: list[str] = [
        #         f"{left_nodes[i]}_{i}"
        #         for i in range(math.floor(max_degree / 2), max_degree)
        #     ]
        #     second_right_vars: list[str] = [
        #         f"{right_nodes[i]}_{i}"
        #         for i in range(math.floor(max_degree / 2), max_degree)
        #     ]
        #     return [
        #         Switch(
        #             left_nodes=frozenset(left_nodes[:max_degree]),
        #             right_nodes=frozenset(first_vars + second_left_vars),
        #         ),
        #         Switch(
        #             left_nodes=frozenset(second_left_vars + left_nodes[max_degree:]),
        #             right_nodes=frozenset(second_right_vars + right_nodes[max_degree:]),
        #         ),
        #         Switch(
        #             left_nodes=frozenset(first_vars + second_right_vars),
        #             right_nodes=frozenset(right_nodes[:max_degree]),
        #         ),
        #     ]
        # TODO: Needs confirmation
        else:
            return []

ClosNetworkWithMaxDegree

Bases: ClosNetworkBase

Clos network that limits the maximum degree (switch size). Call reset_max_degree before use.

Source code in src/sparse_qubo/networks/clos_network_max_degree.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class ClosNetworkWithMaxDegree(ClosNetworkBase):
    """Clos network that limits the maximum degree (switch size). Call reset_max_degree before use."""

    num_elements_dict: ClassVar[dict[int, int]] = {}
    max_degree: ClassVar[int] = 5

    @classmethod
    def reset_max_degree(cls, new_max: int) -> None:
        """Set the maximum allowed degree (switch size) for the network."""
        if new_max < 2:
            raise ValueError("new_max must be greater than or equal to 2")

        cls.num_elements_dict = {}
        cls.max_degree = new_max

    # Number of comparators when implementing a network of size N based on (n, r)
    @classmethod
    def _calc_num_elements(cls, N: int, n: int, r: int) -> int:
        interior_cost = cls._get_estimated_cost_and_implementation(r) * n
        exterior_cost = r
        return exterior_cost * 2 + interior_cost

    # Return the number of comparators
    @classmethod
    def _get_estimated_cost_and_implementation(cls, N: int) -> int:
        if N not in cls.num_elements_dict:
            if adhoc_network := cls._implement_if_small([f"L{i}" for i in range(N)], [f"R{i}" for i in range(N)]):
                cls.num_elements_dict[N] = len(adhoc_network)
            else:
                n_opt, r_opt = cls._determine_switch_sizes(N, N)
                cls.num_elements_dict[N] = cls._calc_num_elements(N, n_opt, r_opt)
        return cls.num_elements_dict[N]

    # Return optimal (n, r)
    @classmethod
    def _determine_switch_sizes(cls, N_left: int, N_right: int) -> tuple[int, int]:
        if cls.max_degree is None:
            raise RuntimeError("max_degree is None")

        N = max(N_left, N_right)

        nr_list: list[tuple[int, int]] = [(n, (N + n - 1) // n) for n in range(2, cls.max_degree + 1)]
        n_opt, r_opt = min(nr_list, key=lambda x: cls._calc_num_elements(N, x[0], x[1]))
        return n_opt, r_opt

    # Return implementation for small cases
    @classmethod
    def _implement_if_small(cls, left_nodes: list[str], right_nodes: list[str]) -> list[Switch] | None:
        if cls.max_degree is None:
            raise RuntimeError("max_degree is None")

        return AdhocNetworkWithMinimumDegree.implement_if_small(left_nodes, right_nodes, cls.max_degree)

reset_max_degree(new_max) classmethod

Set the maximum allowed degree (switch size) for the network.

Source code in src/sparse_qubo/networks/clos_network_max_degree.py
63
64
65
66
67
68
69
70
@classmethod
def reset_max_degree(cls, new_max: int) -> None:
    """Set the maximum allowed degree (switch size) for the network."""
    if new_max < 2:
        raise ValueError("new_max must be greater than or equal to 2")

    cls.num_elements_dict = {}
    cls.max_degree = new_max

sparse_qubo.networks.clos_network_minimum_edge

Implementation of Clos network with minimum edge count: chooses (n, r) to minimize quadratic terms.

ClosNetworkMinimumEdge

Bases: ClosNetworkBase

Clos network that minimizes the number of logical edges (quadratic terms).

Source code in src/sparse_qubo/networks/clos_network_minimum_edge.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class ClosNetworkMinimumEdge(ClosNetworkBase):
    """Clos network that minimizes the number of logical edges (quadratic terms)."""

    is_small_dict: ClassVar[dict[int, bool]] = dict.fromkeys(range(3), True)
    num_logical_edges_dict: ClassVar[dict[int, int]] = {0: 0, 1: 0, 2: 6}

    # Number of logical edges when implementing a network of size N based on (n, r)
    @classmethod
    def _calc_num_logical_edges(cls, N: int, n: int, r: int) -> int:
        interior_cost = cls._get_estimated_cost_and_implementation(r)[0] * n
        exterior_cost = 0
        for r_idx in range(r):
            input_start_idx = N * r_idx // r
            input_end_idx = N * (r_idx + 1) // r
            total_nodes = input_end_idx - input_start_idx + n
            exterior_cost += total_nodes * (total_nodes - 1) // 2
        return exterior_cost * 2 + interior_cost

    # Return the number of logical edges and implementation method (whether clique or not)
    @classmethod
    def _get_estimated_cost_and_implementation(cls, N: int) -> tuple[int, bool]:
        if N not in cls.num_logical_edges_dict:
            n_opt, r_opt = cls._determine_switch_sizes(N, N)
            cost_division = cls._calc_num_logical_edges(N, n_opt, r_opt)
            cost_clique = N * (N * 2 - 1)
            cls.is_small_dict[N] = cost_clique <= cost_division
            cls.num_logical_edges_dict[N] = min(cost_clique, cost_division)
        return cls.num_logical_edges_dict[N], cls.is_small_dict[N]

    # Return optimal (n, r)
    @classmethod
    def _determine_switch_sizes(cls, N_left: int, N_right: int) -> tuple[int, int]:
        N = max(N_left, N_right)

        # r = (N + n - 1) // n is the minimum r that satisfies n*r >= N
        nr_list: list[tuple[int, int]] = [(n, (N + n - 1) // n) for n in range(2, N)]
        n_opt, r_opt = min(nr_list, key=lambda x: cls._calc_num_logical_edges(N, x[0], x[1]))
        return n_opt, r_opt

    # Return implementation for small cases
    @classmethod
    def _implement_if_small(cls, left_nodes: list[str], right_nodes: list[str]) -> list[Switch] | None:
        N = max(len(left_nodes), len(right_nodes))
        is_small = cls._get_estimated_cost_and_implementation(N)[1]
        if is_small:
            return [
                Switch(
                    left_nodes=frozenset(left_nodes),
                    right_nodes=frozenset(right_nodes),
                )
            ]
        else:
            return None

sparse_qubo.networks.divide_and_conquer_network

Implementation of Divide-and-conquer network: recursive division of the constraint.

DivideAndConquerNetwork

Bases: ISwitchingNetwork

Divide-and-conquer switching network; uses BubbleSort when size is small or one-hot.

Source code in src/sparse_qubo/networks/divide_and_conquer_network.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class DivideAndConquerNetwork(ISwitchingNetwork):
    """Divide-and-conquer switching network; uses BubbleSort when size is small or one-hot."""

    @classmethod
    def _generate_original_network(
        cls,
        left_nodes: list[VariableNode],
        right_nodes: list[VariableNode],
        threshold: int | None = None,
        reverse: bool = False,
    ) -> list[Switch]:
        """Return the list of Switch elements; recurses or uses BubbleSort for one-hot/small cases."""
        if len(left_nodes) != len(right_nodes):
            raise ValueError("left_nodes and right_nodes must have the same length")

        num_variables: int = len(left_nodes)

        node_dict: defaultdict[NodeAttribute, list[VariableNode]] = defaultdict(list)
        for node in right_nodes:
            node_dict[node.attribute].append(node)

        # TODO: Currently only supports equal to
        if len(node_dict[NodeAttribute.ZERO_OR_ONE]) != 0:
            raise ValueError("ZERO_OR_ONE nodes are not supported")
        if len(node_dict[NodeAttribute.NOT_CARE]) != 0:
            raise ValueError("NOT_CARE nodes are not supported")
        if len(node_dict[NodeAttribute.ALWAYS_ZERO]) + len(node_dict[NodeAttribute.ALWAYS_ONE]) != num_variables:
            raise ValueError("right_nodes must consist only of ALWAYS_ZERO and ALWAYS_ONE nodes")
        if not all(node.attribute == NodeAttribute.ZERO_OR_ONE for node in left_nodes):
            raise ValueError("All left_nodes must have ZERO_OR_ONE attribute")

        switches: list[Switch] = []

        if (
            len(node_dict[NodeAttribute.ALWAYS_ZERO]) == num_variables
            or len(node_dict[NodeAttribute.ALWAYS_ONE]) == num_variables
        ):
            switches.extend([
                Switch(
                    left_nodes=frozenset([left_node.name]),
                    right_nodes=frozenset([right_node.name]),
                )
                for left_node, right_node in zip(left_nodes, right_nodes, strict=True)
            ])
            return switches

        # Case of one-hot
        if len(node_dict[NodeAttribute.ALWAYS_ONE]) == 1:
            switches.extend(
                BubbleSortNetwork._generate_original_network(
                    left_nodes,
                    node_dict[NodeAttribute.ALWAYS_ZERO] + node_dict[NodeAttribute.ALWAYS_ONE],
                )
            )
            return switches
        elif len(node_dict[NodeAttribute.ALWAYS_ZERO]) == 1:
            switches.extend(
                BubbleSortNetwork._generate_original_network(
                    left_nodes,
                    node_dict[NodeAttribute.ALWAYS_ONE] + node_dict[NodeAttribute.ALWAYS_ZERO],
                )
            )
            return switches
        # Other cases
        else:
            if threshold is not None and num_variables <= threshold:
                switches.append(
                    Switch(
                        left_nodes=frozenset([left_node.name for left_node in left_nodes]),
                        right_nodes=frozenset([right_node.name for right_node in right_nodes]),
                    )
                )
                return switches

            aux_nodes: list[VariableNode] = [
                VariableNode(name=f"{left_node.name}_{idx}", attribute=NodeAttribute.ZERO_OR_ONE)
                for idx, left_node in enumerate(left_nodes)
            ]
            for i in range(num_variables // 2):
                switches.append(
                    Switch(
                        left_nodes=frozenset([
                            left_nodes[i].name,
                            left_nodes[i + ceil(num_variables / 2)].name,
                        ]),
                        right_nodes=frozenset(
                            [
                                aux_nodes[i].name,
                                aux_nodes[i + ceil(num_variables / 2)].name,
                            ],
                        ),
                    )
                )
            # For nodes where no swap occurred, assign directly to aux_nodes
            if num_variables % 2 == 1:
                aux_nodes[num_variables // 2] = left_nodes[num_variables // 2]

            switches.extend(
                cls._generate_original_network(
                    left_nodes=aux_nodes[: ceil(num_variables / 2)],
                    right_nodes=node_dict[NodeAttribute.ALWAYS_ONE][
                        : ceil(len(node_dict[NodeAttribute.ALWAYS_ONE]) / 2)
                    ]
                    + node_dict[NodeAttribute.ALWAYS_ZERO][
                        : ceil(num_variables / 2) - ceil(len(node_dict[NodeAttribute.ALWAYS_ONE]) / 2)
                    ],
                    threshold=threshold,
                )
            )
            switches.extend(
                cls._generate_original_network(
                    left_nodes=aux_nodes[ceil(num_variables / 2) :],
                    right_nodes=node_dict[NodeAttribute.ALWAYS_ONE][
                        ceil(len(node_dict[NodeAttribute.ALWAYS_ONE]) / 2) :
                    ]
                    + node_dict[NodeAttribute.ALWAYS_ZERO][
                        ceil(num_variables / 2) - ceil(len(node_dict[NodeAttribute.ALWAYS_ONE]) / 2) :
                    ],
                    threshold=threshold,
                )
            )
            return switches

sparse_qubo.networks.oddeven_merge_sort_network

Implementation of Odd-even merge sort network: Batcher's algorithm.

OddEvenMergeSortNetwork

Bases: ISwitchingNetwork

Odd-even merge sort (Batcher) network; requires power-of-2 variable count.

Source code in src/sparse_qubo/networks/oddeven_merge_sort_network.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class OddEvenMergeSortNetwork(ISwitchingNetwork):
    """Odd-even merge sort (Batcher) network; requires power-of-2 variable count."""

    @classmethod
    def _generate_original_network(
        cls,
        left_nodes: list[VariableNode],
        right_nodes: list[VariableNode],
        threshold: int | None = None,
        reverse: bool = True,
    ) -> list[Switch]:
        """Return the list of Switch elements for the odd-even merge sort network."""
        left_names = [node.name for node in left_nodes]
        right_names = [node.name for node in right_nodes]
        if len(left_names) != len(right_names):
            raise ValueError("left_nodes and right_nodes must have the same length")
        N: int = len(left_names)
        n: int = round(log2(N))
        if 2**n != N:
            raise ValueError("N must be a power of 2")

        if not reverse:
            left_names, right_names = right_names, left_names

        progress: list[int] = [0] * N
        temp_result_switches: list[Switch] = []
        # m is called in the order (0, 1, 2, 3), (0, 1, 2), (0, 1), (0)
        for m_max in range(1, n + 1)[::-1]:
            M_max: int = 2**m_max
            for i_base in range(0, N, M_max):
                for m in range(m_max):
                    M: int = 2**m
                    i_start: int = i_base if m < m_max - 1 else i_base - M
                    i_end: int = i_base + M_max - M
                    for i in range(i_start, i_end):
                        if (i - i_start) // M % 2 == 1:
                            temp_result_switches.append(
                                Switch(
                                    left_nodes=frozenset([
                                        f"{i}_{progress[i]}",
                                        f"{i + M}_{progress[i + M]}",
                                    ]),
                                    right_nodes=frozenset([
                                        f"{i}_{progress[i] + 1}",
                                        f"{i + M}_{progress[i + M] + 1}",
                                    ]),
                                )
                            )
                            progress[i] += 1
                            progress[i + M] += 1
        temp_name_to_result_name: dict[str, str] = {}
        for i in range(N):
            for j in range(progress[i] + 1):
                if j == 0:
                    temp_name_to_result_name[f"{i}_{j}"] = left_names[i]
                elif j == progress[i]:
                    temp_name_to_result_name[f"{i}_{j}"] = right_names[i]
                else:
                    temp_name_to_result_name[f"{i}_{j}"] = f"{left_names[i]}_{j - 1}_{right_names[i]}"
        if reverse:
            return [
                Switch(
                    left_nodes=frozenset([temp_name_to_result_name[left_node] for left_node in switch.left_nodes]),
                    right_nodes=frozenset([temp_name_to_result_name[right_node] for right_node in switch.right_nodes]),
                )
                for switch in temp_result_switches
            ]
        else:
            res = [
                Switch(
                    left_nodes=frozenset([temp_name_to_result_name[left_node] for left_node in switch.right_nodes]),
                    right_nodes=frozenset([temp_name_to_result_name[right_node] for right_node in switch.left_nodes]),
                )
                for switch in temp_result_switches
            ]
            return res[::-1]

D-Wave integration

Public entry point: sparse_qubo.create_constraint_dwave. Implementation module:

sparse_qubo.dwave.constraint

D-Wave/dimod integration: build dimod BQMs from constraint types and network types.

Use create_constraint_dwave (from sparse_qubo) or constraint() from this module to obtain a BinaryQuadraticModel for use with D-Wave samplers or dimod.

constraint(variables, constraint_type, network_type=NetworkType.DIVIDE_AND_CONQUER, c1=None, c2=None, threshold=None)

Build a dimod BQM for the given constraint using the specified network type (or NAIVE).

Source code in src/sparse_qubo/dwave/constraint.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def constraint(
    variables: dimod.variables.Variables,
    constraint_type: ConstraintType,
    network_type: NetworkType = NetworkType.DIVIDE_AND_CONQUER,
    c1: int | None = None,
    c2: int | None = None,
    threshold: int | None = None,
) -> dimod.BinaryQuadraticModel:
    """Build a dimod BQM for the given constraint using the specified network type (or NAIVE)."""
    if network_type == NetworkType.NAIVE:
        bqm = naive_constraint(variables, constraint_type, c1, c2)
        return bqm

    variable_names = [str(v) for v in variables]
    qubo = get_constraint_qubo(variable_names, constraint_type, network_type, c1, c2, threshold)
    bqm = dimod.BinaryQuadraticModel(
        qubo.linear,
        qubo.quadratic,
        qubo.constant,
        dimod.BINARY,
    )
    return bqm

naive_constraint(variables, constraint_type, c1=None, c2=None)

Encode the constraint as a single linear equality/inequality (no switching network).

Source code in src/sparse_qubo/dwave/constraint.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def naive_constraint(
    variables: dimod.variables.Variables,
    constraint_type: ConstraintType,
    c1: int | None = None,
    c2: int | None = None,
) -> dimod.BinaryQuadraticModel:
    """Encode the constraint as a single linear equality/inequality (no switching network)."""
    size = len(variables)
    terms = [(str(v), 1) for v in variables]
    bqm = dimod.BinaryQuadraticModel(dimod.BINARY)
    lagrange_multiplier = 1
    match constraint_type:
        case ConstraintType.ONE_HOT:
            bqm.add_linear_equality_constraint(terms, lagrange_multiplier, -1)
        case ConstraintType.EQUAL_TO:
            if not (c1 is not None and 0 <= c1 <= size):
                raise ValueError("c1 must be between 0 and size")
            bqm.add_linear_equality_constraint(terms, lagrange_multiplier, -c1)
        case ConstraintType.LESS_EQUAL:
            if not (c1 is not None and 0 <= c1 <= size):
                raise ValueError("c1 must be between 0 and size")
            bqm.add_linear_inequality_constraint(terms, lagrange_multiplier, label="s", ub=c1)
        case ConstraintType.GREATER_EQUAL:
            if not (c1 is not None and 0 <= c1 <= size):
                raise ValueError("c1 must be between 0 and size")
            bqm.add_linear_inequality_constraint(terms, lagrange_multiplier, label="s", lb=c1, ub=size)
        case ConstraintType.CLAMP:
            if not (c1 is not None and c2 is not None and 0 <= c1 <= c2 <= size):
                raise ValueError("c1 and c2 must be between 0 and size")
            bqm.add_linear_inequality_constraint(terms, lagrange_multiplier, label="s", lb=c1, ub=c2)
        case _:
            raise NotImplementedError

    return bqm

Fixstars Amplify integration

Public entry point: sparse_qubo.create_constraint_amplify. Implementation module:

sparse_qubo.fixstars_amplify.constraint

Fixstars Amplify integration: build Amplify models from constraint types and network types.

Use create_constraint_amplify (from sparse_qubo) or constraint() from this module to obtain an amplify.Model for use with Amplify solvers.

constraint(variables, constraint_type, network_type=NetworkType.DIVIDE_AND_CONQUER, c1=None, c2=None, threshold=None)

Build an Amplify model for the given constraint using the specified network type (or NAIVE).

Source code in src/sparse_qubo/fixstars_amplify/constraint.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def constraint(
    variables: list[amplify.Variable],
    constraint_type: ConstraintType,
    network_type: NetworkType = NetworkType.DIVIDE_AND_CONQUER,
    c1: int | None = None,
    c2: int | None = None,
    threshold: int | None = None,
) -> amplify.Model:
    """Build an Amplify model for the given constraint using the specified network type (or NAIVE)."""
    if network_type == NetworkType.NAIVE:
        model = naive_constraint(variables, constraint_type, c1, c2)
        return model

    variable_names = [v.name for v in variables]
    qubo = get_constraint_qubo(variable_names, constraint_type, network_type, c1, c2, threshold)
    model = generate_amplify_model(variables, qubo)
    return model

generate_amplify_model(variables, qubo)

Build an Amplify model from a list of Amplify variables and a QUBO (linear + quadratic + constant).

Source code in src/sparse_qubo/fixstars_amplify/constraint.py
48
49
50
51
52
53
54
55
56
57
58
59
def generate_amplify_model(variables: list[amplify.Variable], qubo: QUBO) -> amplify.Model:
    """Build an Amplify model from a list of Amplify variables and a QUBO (linear + quadratic + constant)."""
    poly_map = {v.name: amplify.Poly(v) for v in variables}
    objectives = amplify.Poly(0)

    for (v1, v2), coef in qubo.quadratic.items():
        objectives += coef * poly_map[v1] * poly_map[v2]
    for v, coef in qubo.linear.items():
        objectives += coef * poly_map[v]
    objectives += qubo.constant

    return amplify.Model(objectives)

naive_constraint(variables, constraint_type, c1=None, c2=None)

Encode the constraint using Amplify's built-in one_hot/equal_to/less_equal/etc. (no switching network).

Source code in src/sparse_qubo/fixstars_amplify/constraint.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def naive_constraint(
    variables: list[amplify.Variable],
    constraint_type: ConstraintType,
    c1: int | None = None,
    c2: int | None = None,
) -> amplify.Model:
    """Encode the constraint using Amplify's built-in one_hot/equal_to/less_equal/etc. (no switching network)."""
    size = len(variables)
    sum_poly: amplify.Poly = sum([amplify.Poly(v) for v in variables], amplify.Poly(0))
    match constraint_type:
        case ConstraintType.ONE_HOT:
            constraint = amplify.one_hot(sum_poly)
        case ConstraintType.EQUAL_TO:
            if not (c1 is not None and 0 <= c1 <= size):
                raise ValueError("c1 must be between 0 and size")
            constraint = amplify.equal_to(sum_poly, c1)
        case ConstraintType.LESS_EQUAL:
            if not (c1 is not None and 0 <= c1 <= size):
                raise ValueError("c1 must be between 0 and size")
            constraint = amplify.less_equal(sum_poly, c1)
        case ConstraintType.GREATER_EQUAL:
            if not (c1 is not None and 0 <= c1 <= size):
                raise ValueError("c1 must be between 0 and size")
            constraint = amplify.greater_equal(sum_poly, c1)
        case ConstraintType.CLAMP:
            if not (c1 is not None and c2 is not None and 0 <= c1 <= c2 <= size):
                raise ValueError("c1 and c2 must be between 0 and size")
            constraint = amplify.clamp(sum_poly, (c1, c2))
        case _:
            raise NotImplementedError

    return amplify.Model(constraint)